From 13104ab0fffc2ecefcad90a9a043064732d8c6ba Mon Sep 17 00:00:00 2001 From: "James E. Blair" Date: Tue, 10 Dec 2019 15:47:39 -0800 Subject: [PATCH] Add Google Cloud provider Also add a TaskManager and a SimpleTaskManagerDriver. Change-Id: I5c44b24600838ae9afcc6a39c482c67933548bc0 --- doc/source/configuration.rst | 275 +++++++++++++++++- doc/source/devguide.rst | 56 ++++ nodepool/driver/gce/__init__.py | 27 ++ nodepool/driver/gce/adapter.py | 115 ++++++++ nodepool/driver/gce/config.py | 250 ++++++++++++++++ nodepool/driver/simple.py | 488 ++++++++++++++++++++++++++++++++ nodepool/driver/taskmanager.py | 187 ++++++++++++ requirements.txt | 1 + 8 files changed, 1386 insertions(+), 13 deletions(-) create mode 100644 nodepool/driver/gce/__init__.py create mode 100644 nodepool/driver/gce/adapter.py create mode 100644 nodepool/driver/gce/config.py create mode 100644 nodepool/driver/simple.py create mode 100644 nodepool/driver/taskmanager.py diff --git a/doc/source/configuration.rst b/doc/source/configuration.rst index cdd628168..554f4bdac 100644 --- a/doc/source/configuration.rst +++ b/doc/source/configuration.rst @@ -365,17 +365,17 @@ Options The driver type. - .. value:: openstack + .. value:: aws For details on the extra options required and provided by the - OpenStack driver, see the separate section - :attr:`providers.[openstack]` + AWS driver, see the separate section + :attr:`providers.[aws]` - .. value:: static + .. value:: gce For details on the extra options required and provided by the - static driver, see the separate section - :attr:`providers.[static]` + GCE driver, see the separate section + :attr:`providers.[gce]` .. value:: kubernetes @@ -389,18 +389,24 @@ Options openshift driver, see the separate section :attr:`providers.[openshift]` - .. value:: aws - - For details on the extra options required and provided by the - AWS driver, see the separate section - :attr:`providers.[aws]` - .. value:: openshiftpods For details on the extra options required and provided by the openshiftpods driver, see the separate section :attr:`providers.[openshiftpods]` + .. value:: openstack + + For details on the extra options required and provided by the + OpenStack driver, see the separate section + :attr:`providers.[openstack]` + + .. value:: static + + For details on the extra options required and provided by the + static driver, see the separate section + :attr:`providers.[static]` + OpenStack Driver ---------------- @@ -1631,7 +1637,7 @@ section of the configuration. :type: list Each entry in this section must refer to an entry in the - :attr:`providers.[aws].cloud-images` section. + :attr:`labels` section. .. code-block:: yaml @@ -1811,3 +1817,246 @@ section of the configuration. .. _`AWS region`: https://docs.aws.amazon.com/general/latest/gr/rande.html .. _`Boto configuration`: https://boto3.amazonaws.com/v1/documentation/api/latest/guide/configuration.html .. _`Boto describe images`: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/ec2.html#EC2.Client.describe_images + +.. _gce_driver: + +Google Cloud Compute Engine (GCE) Driver +---------------------------------------- + +Selecting the gce driver adds the following options to the :attr:`providers` +section of the configuration. + +.. attr-overview:: + :prefix: providers.[gce] + :maxdepth: 3 + +.. attr:: providers.[gce] + :type: list + + An GCE provider's resources are partitioned into groups called `pool` + (see :attr:`providers.[gce].pools` for details), and within a pool, + the node types which are to be made available are listed + (see :attr:`providers.[gce].pools.labels` for details). + + See `Application Default Credentials`_ for information on how to + configure credentials and other settings for GCE access in + Nodepool's runtime environment. + + .. note:: For documentation purposes the option names are prefixed + ``providers.[gce]`` to disambiguate from other + drivers, but ``[gce]`` is not required in the + configuration (e.g. below + ``providers.[gce].pools`` refers to the ``pools`` + key in the ``providers`` section when the ``gce`` + driver is selected). + + Example: + + .. code-block:: yaml + + - name: gce-uscentral1 + driver: gce + project: nodepool-123456 + region: us-central1 + zone: us-central1-a + cloud-images: + - name: debian-stretch + image-project: debian-cloud + image-family: debian-9 + username: zuul + key: ssh-rsa ... + pools: + - name: main + max-servers: 8 + labels: + - name: debian-stretch + instance-type: f1-micro + cloud-image: debian-stretch + volume-type: standard + volume-size: 10 + + .. attr:: name + :required: + + A unique name for this provider configuration. + + .. attr:: region + :required: + + Name of the region to use; see `GCE regions and zones`_. + + .. attr:: zone + :required: + + Name of the zone to use; see `GCE regions and zones`_. + + .. attr:: boot-timeout + :type: int seconds + :default: 60 + + Once an instance is active, how long to try connecting to the + image via SSH. If the timeout is exceeded, the node launch is + aborted and the instance deleted. + + .. attr:: launch-retries + :default: 3 + + The number of times to retry launching a node before considering + the job failed. + + .. attr:: cloud-images + :type: list + + Each entry in this section must refer to an entry in the + :attr:`labels` section. + + .. code-block:: yaml + + cloud-images: + - name: debian-stretch + image-project: debian-cloud + image-family: debian-9 + username: zuul + key: ssh-rsa ... + + Each entry is a dictionary with the following keys: + + .. attr:: name + :type: string + :required: + + Identifier to refer this cloud-image from + :attr:`providers.[gce].pools.labels` section. + + .. attr:: image-id + :type: str + + If this is provided, it is used to select the image from the cloud + provider by ID. + + .. attr:: image-project + :type: str + + If :attr:`providers.[gce].cloud-images.image-id` is not + provided, this is used along with + :attr:`providers.[gce].cloud-images.image-family` to find an + image. + + .. attr:: image-family + :type: str + + If :attr:`providers.[gce].cloud-images.image-id` is not + provided, this is used along with + :attr:`providers.[gce].cloud-images.image-project` to find an + image. + + .. attr:: username + :type: str + + The username that a consumer should use when connecting to the node. + + .. attr:: key + :type: str + + An SSH public key to add to the instance (project global keys + are added automatically). + + .. attr:: python-path + :type: str + :default: auto + + The path of the default python interpreter. Used by Zuul to set + ``ansible_python_interpreter``. The special value ``auto`` will + direct Zuul to use inbuilt Ansible logic to select the + interpreter on Ansible >=2.8, and default to + ``/usr/bin/python2`` for earlier versions. + + .. attr:: connection-type + :type: str + + The connection type that a consumer should use when connecting to the + node. For most images this is not necessary. However when creating + Windows images this could be 'winrm' to enable access via ansible. + + .. attr:: connection-port + :type: int + :default: 22/ 5986 + + The port that a consumer should use when connecting to the node. For + most diskimages this is not necessary. This defaults to 22 for ssh and + 5986 for winrm. + + .. attr:: pools + :type: list + + A pool defines a group of resources from an GCE provider. Each pool has a + maximum number of nodes which can be launched from it, along with a number + of cloud-related attributes used when launching nodes. + + .. attr:: name + :required: + + A unique name within the provider for this pool of resources. + + .. attr:: host-key-checking + :type: bool + :default: True + + Specify custom behavior of validation of SSH host keys. When set to + False, nodepool-launcher will not ssh-keyscan nodes after they are + booted. This might be needed if nodepool-launcher and the nodes it + launches are on different networks. The default value is True. + + .. attr:: labels + :type: list + + Each entry in a pool's `labels` section indicates that the + corresponding label is available for use in this pool. When creating + nodes for a label, the flavor-related attributes in that label's + section will be used. + + .. code-block:: yaml + + labels: + - name: debian + instance-type: f1-micro + cloud-image: debian-stretch + + Each entry is a dictionary with the following keys + + .. attr:: name + :type: str + :required: + + Identifier to refer this label. + + .. attr:: cloud-image + :type: str + :required: + + Refers to the name of an externally managed image in the + cloud that already exists on the provider. The value of + ``cloud-image`` should match the ``name`` of a previously + configured entry from the ``cloud-images`` section of the + provider. See :attr:`providers.[gce].cloud-images`. + + .. attr:: instance-type + :type: str + :required: + + Name of the flavor to use. See `GCE machine types`_. + + .. attr:: volume-type + :type: string + + If given, the root volume type (``standard`` or ``ssd``). + + .. attr:: volume-size + :type: int + + If given, the size of the root volume, in GiB. + + +.. _`Application Default Credentials`: https://cloud.google.com/docs/authentication/production +.. _`GCE regions and zones`: https://cloud.google.com/compute/docs/regions-zones/ +.. _`GCE machine types`: https://cloud.google.com/compute/docs/machine-types diff --git a/doc/source/devguide.rst b/doc/source/devguide.rst index 71f78a0c7..cd3b33f13 100644 --- a/doc/source/devguide.rst +++ b/doc/source/devguide.rst @@ -82,6 +82,8 @@ Those objects are referenced from the Driver main interface that needs to be implemented in the __init__.py file of the driver directory. +.. _provider_config: + ProviderConfig ~~~~~~~~~~~~~~ @@ -147,3 +149,57 @@ The launch procedure usually consists of the following operations: Once an external_id is obtained, it should be stored to the node.external_id. - Once the resource is created, READY should be stored to the node.state. Otherwise raise an exception to restart the launch attempt. + +TaskManager +----------- + +If you need to use a thread-unsafe client library, or you need to +manage rate limiting in your driver, you may want to use the +:py:class:`~nodepool.driver.taskmanager.TaskManager` class. Implement +any remote API calls as tasks and invoke them by submitting the tasks +to the TaskManager. It will run them sequentially from a single +thread, and assist in rate limiting. + +The :py:class:`~nodepool.driver.taskmanager.BaseTaskManagerProvider` +class is a subclass of :py:class:`~nodepool.driver.Provider` which +starts and stops a TaskManager automatically. Inherit from it to +build a Provider as described above with a TaskManager. + +.. autoclass:: nodepool.driver.taskmanager.Task + :members: +.. autoclass:: nodepool.driver.taskmanager.TaskManager + :members: +.. autoclass:: nodepool.driver.taskmanager.BaseTaskManagerProvider + + +Simple Drivers +-------------- + +If your system is simple enough, you may be able to use the +SimpleTaskManagerDriver class to implement support with just a few +methods. In order to use this class, your system must create and +delete instances as a unit (without requiring multiple resource +creation calls such as volumes or floating IPs). + +.. note:: This system is still in development and lacks robust support + for quotas or image building. + +To use this system, you will need to implement a few subclasses. +First, create a :ref:`provider_config` subclass as you would for any +driver. Then, subclass +:py:class:`~nodepool.driver.simple.SimpleTaskManagerInstance` to map +remote instance data into a format the simple driver can understand. +Next, subclass +:py:class:`~nodepool.driver.simple.SimpleTaskManagerAdapter` to +implement the main API methods of your provider. Finally, subclass +:py:class:`~nodepool.driver.simple.SimpleTaskManagerDriver` to tie them +all together. + +See the ``gce`` provider for an example. + +.. autoclass:: nodepool.driver.simple.SimpleTaskManagerInstance + :members: +.. autoclass:: nodepool.driver.simple.SimpleTaskManagerAdapter + :members: +.. autoclass:: nodepool.driver.simple.SimpleTaskManagerDriver + :members: diff --git a/nodepool/driver/gce/__init__.py b/nodepool/driver/gce/__init__.py new file mode 100644 index 000000000..e0c3e5be8 --- /dev/null +++ b/nodepool/driver/gce/__init__.py @@ -0,0 +1,27 @@ +# Copyright 2019 Red Hat +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# +# See the License for the specific language governing permissions and +# limitations under the License. + +from nodepool.driver.simple import SimpleTaskManagerDriver +from nodepool.driver.gce.config import GCEProviderConfig +from nodepool.driver.gce.adapter import GCEAdapter + + +class GCEDriver(SimpleTaskManagerDriver): + def getProviderConfig(self, provider): + return GCEProviderConfig(self, provider) + + def getAdapter(self, provider_config): + return GCEAdapter(provider_config) diff --git a/nodepool/driver/gce/adapter.py b/nodepool/driver/gce/adapter.py new file mode 100644 index 000000000..c860412cc --- /dev/null +++ b/nodepool/driver/gce/adapter.py @@ -0,0 +1,115 @@ +# Copyright 2019 Red Hat +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import logging + +from nodepool.driver.simple import SimpleTaskManagerAdapter +from nodepool.driver.simple import SimpleTaskManagerInstance + +import googleapiclient.discovery + + +class GCEInstance(SimpleTaskManagerInstance): + def load(self, data): + if data['status'] == 'TERMINATED': + self.deleted = True + elif data['status'] == 'RUNNING': + self.ready = True + self.external_id = data['name'] + self.az = data['zone'] + + iface = data.get('networkInterfaces', []) + if len(iface): + self.private_ipv4 = iface[0].get('networkIP') + access = iface[0].get('accessConfigs', []) + if len(access): + self.public_ipv4 = access[0].get('natIP') + self.interface_ip = self.public_ipv4 or self.private_ipv4 + + if data.get('metadata'): + for item in data['metadata'].get('items', []): + self.metadata[item['key']] = item['value'] + + +class GCEAdapter(SimpleTaskManagerAdapter): + log = logging.getLogger("nodepool.driver.gce.GCEAdapter") + + def __init__(self, provider): + self.provider = provider + self.compute = googleapiclient.discovery.build('compute', 'v1') + + def listInstances(self, task_manager): + servers = [] + + q = self.compute.instances().list(project=self.provider.project, + zone=self.provider.zone) + with task_manager.rateLimit(): + result = q.execute() + + for instance in result.get('items', []): + servers.append(GCEInstance(instance)) + return servers + + def deleteInstance(self, task_manager, server_id): + q = self.compute.instances().delete(project=self.provider.project, + zone=self.provider.zone, + instance=server_id) + with task_manager.rateLimit(): + q.execute() + + def _getImageId(self, task_manager, cloud_image): + image_id = cloud_image.image_id + + if image_id: + return image_id + + if cloud_image.image_family: + q = self.compute.images().getFromFamily( + project=cloud_image.image_project, + family=cloud_image.image_family) + with task_manager.rateLimit(): + result = q.execute() + image_id = result['selfLink'] + + return image_id + + def createInstance(self, task_manager, hostname, metadata, label): + image_id = self._getImageId(task_manager, label.cloud_image) + disk = dict(boot=True, + autoDelete=True, + initializeParams=dict(sourceImage=image_id)) + machine_type = 'zones/{}/machineTypes/{}'.format( + self.provider.zone, label.instance_type) + network = dict(network='global/networks/default', + accessConfigs=[dict( + type='ONE_TO_ONE_NAT', + name='External NAT')]) + metadata_items = [] + for (k, v) in metadata.items(): + metadata_items.append(dict(key=k, value=v)) + meta = dict(items=metadata_items) + args = dict( + name=hostname, + machineType=machine_type, + disks=[disk], + networkInterfaces=[network], + serviceAccounts=[], + metadata=meta) + q = self.compute.instances().insert( + project=self.provider.project, + zone=self.provider.zone, + body=args) + with task_manager.rateLimit(): + q.execute() + return hostname diff --git a/nodepool/driver/gce/config.py b/nodepool/driver/gce/config.py new file mode 100644 index 000000000..a2d4dc793 --- /dev/null +++ b/nodepool/driver/gce/config.py @@ -0,0 +1,250 @@ +# Copyright 2018-2019 Red Hat +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# +# See the License for the specific language governing permissions and +# limitations under the License. + +import voluptuous as v + +from nodepool.driver import ConfigPool +from nodepool.driver import ConfigValue +from nodepool.driver import ProviderConfig + + +class ProviderCloudImage(ConfigValue): + def __init__(self): + self.name = None + self.image_id = None + self.username = None + self.key = None + self.python_path = None + self.connection_type = None + self.connection_port = None + + def __eq__(self, other): + if isinstance(other, ProviderCloudImage): + return (self.name == other.name + and self.image_id == other.image_id + and self.username == other.username + and self.key == other.key + and self.python_path == other.python_path + and self.connection_type == other.connection_type + and self.connection_port == other.connection_port) + return False + + def __repr__(self): + return "" % self.name + + @property + def external_name(self): + '''Human readable version of external.''' + return self.image_id or self.name + + +class ProviderLabel(ConfigValue): + def __init__(self): + self.name = None + self.cloud_image = None + self.instance_type = None + self.volume_size = None + self.volume_type = None + # The ProviderPool object that owns this label. + self.pool = None + + def __eq__(self, other): + if isinstance(other, ProviderLabel): + # NOTE(Shrews): We intentionally do not compare 'pool' here + # since this causes recursive checks with ProviderPool. + return (other.name == self.name + and other.cloud_image == self.cloud_image + and other.instance_type == self.instance_type + and other.volume_size == self.volume_size + and other.volume_type == self.volume_type) + return False + + def __repr__(self): + return "" % self.name + + +class ProviderPool(ConfigPool): + def __init__(self): + self.name = None + self.host_key_checking = True + self.labels = None + # The ProviderConfig object that owns this pool. + self.provider = None + + # Initialize base class attributes + super().__init__() + + def load(self, pool_config, full_config, provider): + super().load(pool_config) + self.name = pool_config['name'] + self.provider = provider + + self.host_key_checking = bool( + pool_config.get('host-key-checking', True)) + + for label in pool_config.get('labels', []): + pl = ProviderLabel() + pl.name = label['name'] + pl.pool = self + self.labels[pl.name] = pl + cloud_image_name = label.get('cloud-image', None) + if cloud_image_name: + cloud_image = self.provider.cloud_images.get( + cloud_image_name, None) + if not cloud_image: + raise ValueError( + "cloud-image %s does not exist in provider %s" + " but is referenced in label %s" % + (cloud_image_name, self.name, pl.name)) + else: + cloud_image = None + pl.cloud_image = cloud_image + pl.instance_type = label['instance-type'] + pl.volume_type = label.get('volume-type', 'standard') + pl.volume_size = label.get('volume-size', '10') + full_config.labels[label['name']].pools.append(self) + + def __eq__(self, other): + if isinstance(other, ProviderPool): + # NOTE(Shrews): We intentionally do not compare 'provider' here + # since this causes recursive checks with OpenStackProviderConfig. + return (super().__eq__(other) + and other.name == self.name + and other.host_key_checking == self.host_key_checking + and other.labels == self.labels) + return False + + def __repr__(self): + return "" % self.name + + +class GCEProviderConfig(ProviderConfig): + def __init__(self, driver, provider): + self.driver_object = driver + self.__pools = {} + self.region = None + self.boot_timeout = None + self.launch_retries = None + self.project = None + self.zone = None + self.cloud_images = {} + self.rate_limit = None + super().__init__(provider) + + def __eq__(self, other): + if isinstance(other, GCEProviderConfig): + return (super().__eq__(other) + and other.region == self.region + and other.pools == self.pools + and other.boot_timeout == self.boot_timeout + and other.launch_retries == self.launch_retries + and other.cloud_images == self.cloud_images + and other.project == self.project + and other.rate_limit == self.rate_limit + and other.zone == self.zone) + return False + + @property + def pools(self): + return self.__pools + + @property + def manage_images(self): + # Currently we have no image management for google. This should + # be updated if that changes. + return False + + @staticmethod + def reset(): + pass + + def load(self, config): + self.region = self.provider.get('region') + self.boot_timeout = self.provider.get('boot-timeout', 60) + self.launch_retries = self.provider.get('launch-retries', 3) + self.project = self.provider.get('project') + self.zone = self.provider.get('zone') + self.rate_limit = self.provider.get('rate-limit', 1) + + default_port_mapping = { + 'ssh': 22, + 'winrm': 5986, + } + # TODO: diskimages + + for image in self.provider.get('cloud-images', []): + i = ProviderCloudImage() + i.name = image['name'] + i.image_id = image.get('image-id', None) + i.image_project = image.get('image-project', None) + i.image_family = image.get('image-family', None) + i.username = image.get('username', None) + i.key = image.get('key', None) + i.python_path = image.get('python-path', 'auto') + i.connection_type = image.get('connection-type', 'ssh') + i.connection_port = image.get( + 'connection-port', + default_port_mapping.get(i.connection_type, 22)) + self.cloud_images[i.name] = i + + for pool in self.provider.get('pools', []): + pp = ProviderPool() + pp.load(pool, config, self) + self.pools[pp.name] = pp + + def getSchema(self): + pool_label = { + v.Required('name'): str, + v.Required('cloud-image'): str, + v.Required('instance-type'): str, + 'volume-type': str, + 'volume-size': int + } + + pool = ConfigPool.getCommonSchemaDict() + pool.update({ + v.Required('name'): str, + v.Required('labels'): [pool_label], + }) + + provider_cloud_images = { + 'name': str, + 'connection-type': str, + 'connection-port': int, + 'image-id': str, + 'username': str, + 'key': str, + 'python-path': str, + } + + provider = ProviderConfig.getCommonSchemaDict() + provider.update({ + v.Required('pools'): [pool], + v.Required('region'): str, + v.Required('project'): str, + v.Required('zone'): str, + 'cloud-images': [provider_cloud_images], + 'boot-timeout': int, + 'launch-retries': int, + }) + return v.Schema(provider) + + def getSupportedLabels(self, pool_name=None): + labels = set() + for pool in self.pools.values(): + if not pool_name or (pool.name == pool_name): + labels.update(pool.labels.keys()) + return labels diff --git a/nodepool/driver/simple.py b/nodepool/driver/simple.py new file mode 100644 index 000000000..aea9a6a25 --- /dev/null +++ b/nodepool/driver/simple.py @@ -0,0 +1,488 @@ +# Copyright 2019 Red Hat +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import time +import logging +import math + +from nodepool.driver.taskmanager import BaseTaskManagerProvider, Task +from nodepool.driver import Driver, NodeRequestHandler +from nodepool.driver.utils import NodeLauncher, QuotaInformation +from nodepool.nodeutils import iterate_timeout, nodescan +from nodepool import exceptions +from nodepool import zk + + +# Private support classes + +class CreateInstanceTask(Task): + name = 'create_instance' + + def main(self, manager): + return self.args['adapter'].createInstance( + manager, self.args['hostname'], self.args['metadata'], + self.args['label_config']) + + +class DeleteInstanceTask(Task): + name = 'delete_instance' + + def main(self, manager): + return self.args['adapter'].deleteInstance( + manager, self.args['external_id']) + + +class ListInstancesTask(Task): + name = 'list_instances' + + def main(self, manager): + return self.args['adapter'].listInstances(manager) + + +class SimpleTaskManagerLauncher(NodeLauncher): + """The NodeLauncher implementation for the SimpleTaskManager driver + framework""" + def __init__(self, handler, node, provider_config, provider_label): + super().__init__(handler.zk, node, provider_config) + self.provider_name = provider_config.name + self.retries = provider_config.launch_retries + self.pool = provider_config.pools[provider_label.pool.name] + self.handler = handler + self.zk = handler.zk + self.boot_timeout = provider_config.boot_timeout + self.label = provider_label + + def launch(self): + self.log.debug("Starting %s instance" % self.node.type) + attempts = 1 + hostname = 'nodepool-' + self.node.id + tm = self.handler.manager.task_manager + adapter = self.handler.manager.adapter + metadata = {'nodepool_node_id': self.node.id, + 'nodepool_pool_name': self.pool.name, + 'nodepool_provider_name': self.provider_name} + if self.label.cloud_image.key: + metadata['ssh-keys'] = '{}:{}'.format( + self.label.cloud_image.username, + self.label.cloud_image.key) + while attempts <= self.retries: + try: + t = tm.submitTask(CreateInstanceTask( + adapter=adapter, hostname=hostname, + metadata=metadata, + label_config=self.label)) + external_id = t.wait() + break + except Exception: + if attempts <= self.retries: + self.log.exception( + "Launch attempt %d/%d failed for node %s:", + attempts, self.retries, self.node.id) + if attempts == self.retries: + raise + attempts += 1 + time.sleep(1) + + self.node.external_id = external_id + self.zk.storeNode(self.node) + + for count in iterate_timeout( + self.boot_timeout, exceptions.LaunchStatusException, + "server %s creation" % external_id): + instance = self.handler.manager.getInstance(external_id) + if instance and instance.ready: + break + + self.log.debug("Created instance %s", repr(instance)) + + server_ip = instance.interface_ip + + self.node.connection_port = self.label.cloud_image.connection_port + self.node.connection_type = self.label.cloud_image.connection_type + keys = [] + if self.pool.host_key_checking: + try: + if (self.node.connection_type == 'ssh' or + self.node.connection_type == 'network_cli'): + gather_hostkeys = True + else: + gather_hostkeys = False + keys = nodescan(server_ip, port=self.node.connection_port, + timeout=180, gather_hostkeys=gather_hostkeys) + except Exception: + raise exceptions.LaunchKeyscanException( + "Can't scan instance %s key" % hostname) + + self.log.info("Instance %s ready" % hostname) + self.node.state = zk.READY + self.node.external_id = hostname + self.node.hostname = hostname + self.node.interface_ip = server_ip + self.node.public_ipv4 = instance.public_ipv4 + self.node.private_ipv4 = instance.private_ipv4 + self.node.public_ipv6 = instance.public_ipv6 + self.node.region = instance.region + self.node.az = instance.az + self.node.host_keys = keys + self.node.username = self.label.cloud_image.username + self.node.python_path = self.label.cloud_image.python_path + self.zk.storeNode(self.node) + self.log.info("Instance %s is ready", hostname) + + +class SimpleTaskManagerHandler(NodeRequestHandler): + log = logging.getLogger("nodepool.driver.simple." + "SimpleTaskManagerHandler") + + def __init__(self, pw, request): + super().__init__(pw, request) + self._threads = [] + + @property + def alive_thread_count(self): + count = 0 + for t in self._threads: + if t.isAlive(): + count += 1 + return count + + def imagesAvailable(self): + ''' + Determines if the requested images are available for this provider. + + :returns: True if it is available, False otherwise. + ''' + return True + + def hasProviderQuota(self, node_types): + ''' + Checks if a provider has enough quota to handle a list of nodes. + This does not take our currently existing nodes into account. + + :param node_types: list of node types to check + :return: True if the node list fits into the provider, False otherwise + ''' + # TODO: Add support for real quota handling; this only handles + # max_servers. + needed_quota = QuotaInformation( + cores=1, + instances=len(node_types), + ram=1, + default=1) + pool_quota = QuotaInformation( + cores=math.inf, + instances=self.pool.max_servers, + ram=math.inf, + default=math.inf) + pool_quota.subtract(needed_quota) + self.log.debug("hasProviderQuota({},{}) = {}".format( + self.pool, node_types, pool_quota)) + return pool_quota.non_negative() + + def hasRemainingQuota(self, ntype): + ''' + Checks if the predicted quota is enough for an additional node of type + ntype. + + :param ntype: node type for the quota check + :return: True if there is enough quota, False otherwise + ''' + # TODO: Add support for real quota handling; this only handles + # max_servers. + needed_quota = QuotaInformation(cores=1, instances=1, ram=1, default=1) + n_running = self.manager.countNodes(self.provider.name, self.pool.name) + pool_quota = QuotaInformation( + cores=math.inf, + instances=self.pool.max_servers - n_running, + ram=math.inf, + default=math.inf) + pool_quota.subtract(needed_quota) + self.log.debug("hasRemainingQuota({},{}) = {}".format( + self.pool, ntype, pool_quota)) + return pool_quota.non_negative() + + def launchesComplete(self): + ''' + Check if all launch requests have completed. + + When all of the Node objects have reached a final state (READY or + FAILED), we'll know all threads have finished the launch process. + ''' + if not self._threads: + return True + + # Give the NodeLaunch threads time to finish. + if self.alive_thread_count: + return False + + node_states = [node.state for node in self.nodeset] + + # NOTE: It is very important that NodeLauncher always sets one + # of these states, no matter what. + if not all(s in (zk.READY, zk.FAILED) for s in node_states): + return False + + return True + + def launch(self, node): + label = self.pool.labels[node.type[0]] + thd = SimpleTaskManagerLauncher(self, node, self.provider, label) + thd.start() + self._threads.append(thd) + + +class SimpleTaskManagerProvider(BaseTaskManagerProvider): + """The Provider implementation for the SimpleTaskManager driver + framework""" + def __init__(self, adapter, provider): + super().__init__(provider) + self.adapter = adapter + self.node_cache_time = 0 + self.node_cache = [] + self._zk = None + + def start(self, zk_conn): + super().start(zk_conn) + self._zk = zk_conn + + def getRequestHandler(self, poolworker, request): + return SimpleTaskManagerHandler(poolworker, request) + + def labelReady(self, label): + return True + + def cleanupNode(self, external_id): + instance = self.getInstance(external_id) + if (not instance) or instance.deleted: + raise exceptions.NotFound() + t = self.task_manager.submitTask(DeleteInstanceTask( + adapter=self.adapter, external_id=external_id)) + t.wait() + + def waitForNodeCleanup(self, external_id, timeout=600): + for count in iterate_timeout( + timeout, exceptions.ServerDeleteException, + "server %s deletion" % external_id): + instance = self.getInstance(external_id) + if (not instance) or instance.deleted: + return + + def cleanupLeakedResources(self): + deleting_nodes = {} + + for node in self._zk.nodeIterator(): + if node.state == zk.DELETING: + if node.provider != self.provider.name: + continue + if node.provider not in deleting_nodes: + deleting_nodes[node.provider] = [] + deleting_nodes[node.provider].append(node.external_id) + + for server in self.listNodes(): + meta = server.metadata + if meta.get('nodepool_provider_name') != self.provider.name: + # Not our responsibility + continue + + if (server.external_id in + deleting_nodes.get(self.provider.name, [])): + # Already deleting this node + continue + + if not self._zk.getNode(meta['nodepool_node_id']): + self.log.warning( + "Marking for delete leaked instance %s in %s " + "(unknown node id %s)", + server.external_id, self.provider.name, + meta['nodepool_node_id'] + ) + # Create an artifical node to use for deleting the server. + node = zk.Node() + node.external_id = server.external_id + node.provider = self.provider.name + node.state = zk.DELETING + self._zk.storeNode(node) + + def listNodes(self): + now = time.monotonic() + if now - self.node_cache_time > 5: + t = self.task_manager.submitTask(ListInstancesTask( + adapter=self.adapter)) + nodes = t.wait() + self.node_cache = nodes + self.node_cache_time = time.monotonic() + return self.node_cache + + def countNodes(self, provider_name, pool_name): + return len( + [n for n in self.listNodes() if + n.metadata.get('nodepool_provider_name') == provider_name and + n.metadata.get('nodepool_pool_name') == pool_name]) + + def getInstance(self, external_id): + for candidate in self.listNodes(): + if (candidate.external_id == external_id): + return candidate + return None + + +# Public interface below + +class SimpleTaskManagerInstance: + """Represents a cloud instance + + This class is used by the Simple Task Manager Driver classes to + represent a standardized version of a remote cloud instance. + Implement this class in your driver, override the :py:meth:`load` + method, and supply as many of the fields as possible. + + :param data: An opaque data object to be passed to the load method. + """ + + def __init__(self, data): + self.ready = False + self.deleted = False + self.external_id = None + self.public_ipv4 = None + self.public_ipv6 = None + self.private_ipv4 = None + self.interface_ip = None + self.az = None + self.region = None + self.metadata = {} + self.load(data) + + def __repr__(self): + state = [] + if self.ready: + state.append('ready') + if self.deleted: + state.append('deleted') + state = ' '.join(state) + return '<{klass} {external_id} {state}>'.format( + klass=self.__class__.__name__, + external_id=self.external_id, + state=state) + + def load(self, data): + """Parse data and update this object's attributes + + :param data: An opaque data object which was passed to the + constructor. + + Override this method and extract data from the `data` + parameter. + + The following attributes are required: + + * ready: bool (whether the instance is ready) + * deleted: bool (whether the instance is in a deleted state) + * external_id: str (the unique id of the instance) + * interface_ip: str + * metadata: dict + + The following are optional: + + * public_ipv4: str + * public_ipv6: str + * private_ipv4: str + * az: str + * region: str + """ + raise NotImplementedError() + + +class SimpleTaskManagerAdapter: + """Public interface for the simple TaskManager Provider + + Implement these methods as simple synchronous calls, and pass this + class to the SimpleTaskManagerDriver class. + + You can establish a single long-lived connection in the + initializer. The provider will call methods on this object from a + single thread. + + All methods accept a task_manager argument. Use this to control + rate limiting: + + .. code:: python + + with task_manager.rateLimit(): + + """ + def __init__(self, provider): + pass + + def createInstance(self, task_manager, hostname, metadata, label_config): + """Create an instance + + :param TaskManager task_manager: An instance of + :py:class:`~nodepool.driver.taskmananger.TaskManager`. + :param str hostname: The intended hostname for the instance. + :param dict metadata: A dictionary of key/value pairs that + must be stored on the instance. + :param ProviderLabel label_config: A LabelConfig object describing + the instance which should be created. + """ + raise NotImplementedError() + + def deleteInstance(self, task_manager, external_id): + """Delete an instance + + :param TaskManager task_manager: An instance of + :py:class:`~nodepool.driver.taskmananger.TaskManager`. + :param str external_id: The id of the cloud instance. + """ + raise NotImplementedError() + + def listInstances(self, task_manager): + """Return a list of instances + + :param TaskManager task_manager: An instance of + :py:class:`~nodepool.driver.taskmananger.TaskManager`. + :returns: A list of :py:class:`SimpleTaskManagerInstance` objects. + """ + raise NotImplementedError() + + +class SimpleTaskManagerDriver(Driver): + """Subclass this to make a simple driver""" + + def getProvider(self, provider_config): + """Return a provider. + + Usually this method does not need to be overridden. + """ + adapter = self.getAdapter(provider_config) + return SimpleTaskManagerProvider(adapter, provider_config) + + # Public interface + + def getProviderConfig(self, provider): + """Instantiate a config object + + :param dict provider: A dictionary of YAML config describing + the provider. + :returns: A ProviderConfig instance with the parsed data. + """ + raise NotImplementedError() + + def getAdapter(self, provider_config): + """Instantiate an adapter + + :param ProviderConfig provider_config: An instance of + ProviderConfig previously returned by :py:meth:`getProviderConfig`. + :returns: An instance of :py:class:`SimpleTaskManagerAdapter` + """ + raise NotImplementedError() diff --git a/nodepool/driver/taskmanager.py b/nodepool/driver/taskmanager.py new file mode 100644 index 000000000..7a44bc8d1 --- /dev/null +++ b/nodepool/driver/taskmanager.py @@ -0,0 +1,187 @@ +# Copyright 2019 Red Hat +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import time +import logging +import queue +import threading + +from nodepool.driver import Provider + + +class Task: + """Base task class for use with :py:class:`TaskManager` + + Subclass this to implement your own tasks. + + Set the `name` field to the name of your task and override the + :py:meth:`main` method. + + Keyword arguments to the constructor are stored on `self.args` for + use by the :py:meth:`main` method. + + """ + name = "task_name" + + def __init__(self, **kw): + self._wait_event = threading.Event() + self._exception = None + self._traceback = None + self._result = None + self.args = kw + + def done(self, result): + self._result = result + self._wait_event.set() + + def exception(self, e): + self._exception = e + self._wait_event.set() + + def wait(self): + """Call this method after submitting the task to the TaskManager to + receieve the results.""" + self._wait_event.wait() + if self._exception: + raise self._exception + return self._result + + def run(self, manager): + try: + self.done(self.main(manager)) + except Exception as e: + self.exception(e) + + def main(self, manager): + """Implement the work of the task + + :param TaskManager manager: The instance of + :py:class:`TaskManager` running this task. + + Arguments passed to the constructor are available as `self.args`. + """ + pass + + +class StopTask(Task): + name = "stop_taskmanager" + + def main(self, manager): + manager._running = False + + +class RateLimitContextManager: + def __init__(self, task_manager): + self.task_manager = task_manager + + def __enter__(self): + if self.task_manager.last_ts is None: + return + while True: + delta = time.monotonic() - self.task_manager.last_ts + if delta >= self.task_manager.delta: + break + time.sleep(self.task_manager.delta - delta) + + def __exit__(self, etype, value, tb): + self.task_manager.last_ts = time.monotonic() + + +class TaskManager: + """A single-threaded task dispatcher + + This class is meant to be instantiated by a Provider in order to + execute remote API calls from a single thread with rate limiting. + + :param str name: The name of the TaskManager (usually the provider name) + used in logging. + :param float rate_limit: The rate limit of the task manager expressed in + requests per second. + """ + log = logging.getLogger("nodepool.driver.taskmanager.TaskManager") + + def __init__(self, name, rate_limit): + self._running = True + self.name = name + self.queue = queue.Queue() + self.delta = 1.0 / rate_limit + self.last_ts = None + + def rateLimit(self): + """Return a context manager to perform rate limiting. Use as follows: + + .. code: python + + with task_manager.rateLimit(): + + """ + return RateLimitContextManager(self) + + def submitTask(self, task): + """Submit a task to the task manager. + + :param Task task: An instance of a subclass of :py:class:`Task`. + :returns: The submitted task for use in function chaning. + """ + self.queue.put(task) + return task + + def stop(self): + """Stop the task manager.""" + self.submitTask(StopTask()) + + def run(self): + try: + while True: + task = self.queue.get() + if not task: + if not self._running: + break + continue + self.log.debug("Manager %s running task %s (queue %s)" % + (self.name, task.name, self.queue.qsize())) + task.run(self) + self.queue.task_done() + except Exception: + self.log.exception("Task manager died") + raise + + +class BaseTaskManagerProvider(Provider): + """Subclass this to build a Provider with an included taskmanager""" + + log = logging.getLogger("nodepool.driver.taskmanager.TaskManagerProvider") + + def __init__(self, provider): + self.provider = provider + self.thread = None + self.task_manager = TaskManager(provider.name, provider.rate_limit) + + def start(self, zk_conn): + self.log.debug("Starting") + if self.thread is None: + self.log.debug("Starting thread") + self.thread = threading.Thread(target=self.task_manager.run) + self.thread.start() + + def stop(self): + self.log.debug("Stopping") + if self.thread is not None: + self.log.debug("Stopping thread") + self.task_manager.stop() + + def join(self): + self.log.debug("Joining") + if self.thread is not None: + self.thread.join() diff --git a/requirements.txt b/requirements.txt index d6aed967b..2b079fc9c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -17,3 +17,4 @@ Paste WebOb>=1.8.1 openshift<=0.8.9 boto3 +google-api-python-client