Add a state machine driver framework
This is intended to simplify efficient driver implementation by isolating ZooKeeper and other internal Nodepool logic from cloud implementation, while at the same time reducing the need for threads. It is based on the simple driver interface, but by using state machines, it can accomodate clouds (like Azure and OpenStack) which require multiple steps to create an instance. It's currently only suitable for use with a single pool, because the launcher's poolworkers are multiple threads. However, we should be able to collapse them into one thread which would make this safe for use with multiple pools. We may also need to adjust some of the sleep times in the launcher to accomodate the idea that the pool worker threads will be more active in driving the state machine polling. Change-Id: Ia179220a71653c9e261342a262ff1e23e5408215
This commit is contained in:
parent
91804a5e16
commit
7ce4dbe26d
@ -203,3 +203,38 @@ See the ``gce`` provider for an example.
|
||||
:members:
|
||||
.. autoclass:: nodepool.driver.simple.SimpleTaskManagerDriver
|
||||
:members:
|
||||
|
||||
State Machine Drivers
|
||||
---------------------
|
||||
|
||||
.. note:: This system is still in development and lacks robust support
|
||||
for quotas or image building.
|
||||
|
||||
To use this system, you will need to implement a few subclasses.
|
||||
First, create a :ref:`provider_config` subclass as you would for any
|
||||
driver.
|
||||
|
||||
Then, subclass :py:class:`~nodepool.driver.statemachine.Instance` to
|
||||
map remote instance data into a format the driver can understand.
|
||||
|
||||
Next, create two subclasses of
|
||||
:py:class:`~nodepool.driver.statemachine.StateMachine` to
|
||||
implement creating and deleting instances.
|
||||
|
||||
Subclass :py:class:`~nodepool.driver.statemachine.Adapter` to
|
||||
implement the main methods that interact with the cloud.
|
||||
|
||||
Finally, subclass
|
||||
:py:class:`~nodepool.driver.statemachine.StateMachineDriver` to tie
|
||||
them all together.
|
||||
|
||||
See the ``example`` provider for an example.
|
||||
|
||||
.. autoclass:: nodepool.driver.statemachine.Instance
|
||||
:members:
|
||||
.. autoclass:: nodepool.driver.statemachine.StateMachine
|
||||
:members:
|
||||
.. autoclass:: nodepool.driver.statemachine.Adapter
|
||||
:members:
|
||||
.. autoclass:: nodepool.driver.statemachine.StateMachineDriver
|
||||
:members:
|
||||
|
564
nodepool/driver/statemachine.py
Normal file
564
nodepool/driver/statemachine.py
Normal file
@ -0,0 +1,564 @@
|
||||
# Copyright 2019 Red Hat
|
||||
# Copyright 2021 Acme Gating, LLC
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
|
||||
import time
|
||||
import logging
|
||||
import math
|
||||
|
||||
from nodepool.driver import Driver, NodeRequestHandler, Provider
|
||||
from nodepool.driver.utils import QuotaInformation, QuotaSupport
|
||||
from nodepool.nodeutils import iterate_timeout, nodescan
|
||||
from nodepool import exceptions
|
||||
from nodepool import zk
|
||||
from kazoo import exceptions as kze
|
||||
|
||||
|
||||
class StateMachineHandler(NodeRequestHandler):
|
||||
log = logging.getLogger("nodepool.driver.simple."
|
||||
"StateMachineHandler")
|
||||
|
||||
def __init__(self, pw, request):
|
||||
super().__init__(pw, request)
|
||||
self.state_machines = []
|
||||
|
||||
@property
|
||||
def alive_thread_count(self):
|
||||
return len([sm for sm in self.state_machines if sm[1].complete])
|
||||
|
||||
def imagesAvailable(self):
|
||||
'''
|
||||
Determines if the requested images are available for this provider.
|
||||
|
||||
:returns: True if it is available, False otherwise.
|
||||
'''
|
||||
return True
|
||||
|
||||
def hasProviderQuota(self, node_types):
|
||||
'''
|
||||
Checks if a provider has enough quota to handle a list of nodes.
|
||||
This does not take our currently existing nodes into account.
|
||||
|
||||
:param node_types: list of node types to check
|
||||
:return: True if the node list fits into the provider, False otherwise
|
||||
'''
|
||||
needed_quota = QuotaInformation()
|
||||
|
||||
for ntype in node_types:
|
||||
needed_quota.add(
|
||||
self.manager.quotaNeededByLabel(ntype, self.pool))
|
||||
|
||||
if hasattr(self.pool, 'ignore_provider_quota'):
|
||||
if not self.pool.ignore_provider_quota:
|
||||
cloud_quota = self.manager.estimatedNodepoolQuota()
|
||||
cloud_quota.subtract(needed_quota)
|
||||
|
||||
if not cloud_quota.non_negative():
|
||||
return False
|
||||
|
||||
# Now calculate pool specific quota. Values indicating no quota default
|
||||
# to math.inf representing infinity that can be calculated with.
|
||||
pool_quota = QuotaInformation(
|
||||
cores=getattr(self.pool, 'max_cores', None),
|
||||
instances=self.pool.max_servers,
|
||||
ram=getattr(self.pool, 'max_ram', None),
|
||||
default=math.inf)
|
||||
pool_quota.subtract(needed_quota)
|
||||
return pool_quota.non_negative()
|
||||
|
||||
def hasRemainingQuota(self, ntype):
|
||||
'''
|
||||
Checks if the predicted quota is enough for an additional node of type
|
||||
ntype.
|
||||
|
||||
:param ntype: node type for the quota check
|
||||
:return: True if there is enough quota, False otherwise
|
||||
'''
|
||||
needed_quota = self.manager.quotaNeededByLabel(ntype, self.pool)
|
||||
|
||||
# Calculate remaining quota which is calculated as:
|
||||
# quota = <total nodepool quota> - <used quota> - <quota for node>
|
||||
cloud_quota = self.manager.estimatedNodepoolQuota()
|
||||
cloud_quota.subtract(
|
||||
self.manager.estimatedNodepoolQuotaUsed())
|
||||
cloud_quota.subtract(needed_quota)
|
||||
self.log.debug("Predicted remaining provider quota: %s",
|
||||
cloud_quota)
|
||||
|
||||
if not cloud_quota.non_negative():
|
||||
return False
|
||||
|
||||
# Now calculate pool specific quota. Values indicating no quota default
|
||||
# to math.inf representing infinity that can be calculated with.
|
||||
pool_quota = QuotaInformation(
|
||||
cores=getattr(self.pool, 'max_cores', None),
|
||||
instances=self.pool.max_servers,
|
||||
ram=getattr(self.pool, 'max_ram', None),
|
||||
default=math.inf)
|
||||
pool_quota.subtract(
|
||||
self.manager.estimatedNodepoolQuotaUsed(self.pool))
|
||||
self.log.debug("Current pool quota: %s" % pool_quota)
|
||||
pool_quota.subtract(needed_quota)
|
||||
self.log.debug("Predicted remaining pool quota: %s", pool_quota)
|
||||
|
||||
return pool_quota.non_negative()
|
||||
|
||||
def _gatherHostKeys(self, node):
|
||||
keys = []
|
||||
if self.pool.host_key_checking:
|
||||
try:
|
||||
if (node.connection_type == 'ssh' or
|
||||
node.connection_type == 'network_cli'):
|
||||
gather_hostkeys = True
|
||||
else:
|
||||
gather_hostkeys = False
|
||||
keys = nodescan(node.interface_ip, port=node.connection_port,
|
||||
timeout=180, gather_hostkeys=gather_hostkeys)
|
||||
except Exception:
|
||||
raise exceptions.LaunchKeyscanException(
|
||||
"Can't scan instance %s key" % node.id)
|
||||
node.host_keys = keys
|
||||
|
||||
def _updateNodeFromInstance(self, node, instance, gather_keys=False):
|
||||
if instance is None:
|
||||
return
|
||||
|
||||
label = self.pool.labels[node.type[0]]
|
||||
|
||||
if self.pool.use_internal_ip and instance.private_ipv4:
|
||||
server_ip = instance.private_ipv4
|
||||
else:
|
||||
server_ip = instance.interface_ip
|
||||
|
||||
node.external_id = instance.external_id
|
||||
node.interface_ip = server_ip
|
||||
node.public_ipv4 = instance.public_ipv4
|
||||
node.private_ipv4 = instance.private_ipv4
|
||||
node.public_ipv6 = instance.public_ipv6
|
||||
node.region = instance.region
|
||||
node.az = instance.az
|
||||
node.username = label.cloud_image.username
|
||||
node.python_path = label.cloud_image.python_path
|
||||
node.connection_port = label.cloud_image.connection_port
|
||||
node.connection_type = label.cloud_image.connection_type
|
||||
|
||||
if gather_keys:
|
||||
self._gatherHostKeys(node)
|
||||
|
||||
self.zk.storeNode(node)
|
||||
|
||||
def _runStateMachine(self, node, state_machine):
|
||||
if state_machine.complete:
|
||||
return True
|
||||
|
||||
instance = None
|
||||
try:
|
||||
now = time.monotonic()
|
||||
if (now - state_machine.start_time >
|
||||
self.manager.provider.boot_timeout):
|
||||
raise Exception("Timeout waiting for instance creation")
|
||||
instance = state_machine.advance()
|
||||
self.log.debug(f"State machine for {node.id} at "
|
||||
f"{state_machine.state}")
|
||||
if not node.external_id:
|
||||
if not state_machine.external_id:
|
||||
raise Exception("Driver implementation error: state "
|
||||
"machine must produce external ID "
|
||||
"after first advancement")
|
||||
node.external_id = state_machine.external_id
|
||||
if state_machine.complete:
|
||||
self.log.debug(f"Node {node.id} is ready")
|
||||
node.state = zk.READY
|
||||
self._updateNodeFromInstance(node, instance, gather_keys=True)
|
||||
except kze.SessionExpiredError:
|
||||
# Our node lock is gone, leaving the node state as BUILDING.
|
||||
# This will get cleaned up in ZooKeeper automatically, but we
|
||||
# must still set our cached node state to FAILED for the
|
||||
# NodeLaunchManager's poll() method.
|
||||
self.log.error(
|
||||
"Lost ZooKeeper session trying to launch for node %s",
|
||||
node.id)
|
||||
node.state = zk.FAILED
|
||||
node.external_id = state_machine.external_id
|
||||
# TODO: stats
|
||||
# statsd_key = 'error.zksession'
|
||||
except exceptions.QuotaException:
|
||||
self.log.info("Aborting node %s due to quota failure" % node.id)
|
||||
node.state = zk.ABORTED
|
||||
node.external_id = state_machine.external_id
|
||||
self.zk.storeNode(node)
|
||||
# TODO: stats
|
||||
# statsd_key = 'error.quota'
|
||||
except Exception:
|
||||
self.log.exception(
|
||||
"Launch failed for node %s:", node.id)
|
||||
node.state = zk.FAILED
|
||||
node.external_id = state_machine.external_id
|
||||
self.zk.storeNode(node)
|
||||
|
||||
# TODO: stats
|
||||
# if hasattr(e, 'statsd_key'):
|
||||
# statsd_key = e.statsd_key
|
||||
# else:
|
||||
# statsd_key = 'error.unknown'
|
||||
|
||||
if node.state == zk.FAILED:
|
||||
state_machine.complete = True
|
||||
|
||||
if state_machine.complete:
|
||||
return True
|
||||
|
||||
def launchesComplete(self):
|
||||
'''
|
||||
Check if all launch requests have completed.
|
||||
|
||||
When all of the Node objects have reached a final state (READY, FAILED
|
||||
or ABORTED), we'll know all threads have finished the launch process.
|
||||
'''
|
||||
all_complete = True
|
||||
for (node, state_machine) in self.state_machines:
|
||||
if not self._runStateMachine(node, state_machine):
|
||||
all_complete = False
|
||||
|
||||
return all_complete
|
||||
|
||||
def launch(self, node):
|
||||
label = self.pool.labels[node.type[0]]
|
||||
hostname = 'nodepool-' + node.id
|
||||
retries = self.manager.provider.launch_retries
|
||||
metadata = {'nodepool_node_id': node.id,
|
||||
'nodepool_pool_name': self.pool.name,
|
||||
'nodepool_provider_name': self.manager.provider.name}
|
||||
state_machine = self.manager.adapter.getCreateStateMachine(
|
||||
hostname, label, metadata, retries)
|
||||
self.state_machines.append((node, state_machine))
|
||||
|
||||
|
||||
class StateMachineProvider(Provider, QuotaSupport):
|
||||
"""The Provider implementation for the StateMachineManager driver
|
||||
framework"""
|
||||
log = logging.getLogger("nodepool.driver.statemachine."
|
||||
"StateMachineProvider")
|
||||
|
||||
def __init__(self, adapter, provider):
|
||||
super().__init__()
|
||||
self.provider = provider
|
||||
self.adapter = adapter
|
||||
self.delete_state_machines = {}
|
||||
self._zk = None
|
||||
|
||||
def start(self, zk_conn):
|
||||
super().start(zk_conn)
|
||||
self._zk = zk_conn
|
||||
|
||||
def getRequestHandler(self, poolworker, request):
|
||||
return StateMachineHandler(poolworker, request)
|
||||
|
||||
def labelReady(self, label):
|
||||
return True
|
||||
|
||||
def getProviderLimits(self):
|
||||
try:
|
||||
return self.adapter.getQuotaLimits()
|
||||
except NotImplementedError:
|
||||
return QuotaInformation(
|
||||
cores=math.inf,
|
||||
instances=math.inf,
|
||||
ram=math.inf,
|
||||
default=math.inf)
|
||||
|
||||
def quotaNeededByLabel(self, ntype, pool):
|
||||
provider_label = pool.labels[ntype]
|
||||
try:
|
||||
return self.adapter.getQuotaForLabel(provider_label)
|
||||
except NotImplementedError:
|
||||
return QuotaInformation()
|
||||
|
||||
def unmanagedQuotaUsed(self):
|
||||
'''
|
||||
Sums up the quota used by servers unmanaged by nodepool.
|
||||
|
||||
:return: Calculated quota in use by unmanaged servers
|
||||
'''
|
||||
used_quota = QuotaInformation()
|
||||
|
||||
node_ids = set([n.id for n in self._zk.nodeIterator()])
|
||||
|
||||
for instance in self.adapter.listInstances():
|
||||
meta = instance.metadata
|
||||
nodepool_provider_name = meta.get('nodepool_provider_name')
|
||||
if (nodepool_provider_name and
|
||||
nodepool_provider_name == self.provider.name):
|
||||
# This provider (regardless of the launcher) owns this
|
||||
# node so it must not be accounted for unmanaged
|
||||
# quota; unless it has leaked.
|
||||
nodepool_node_id = meta.get('nodepool_node_id')
|
||||
if nodepool_node_id and nodepool_node_id in node_ids:
|
||||
# It has not leaked.
|
||||
continue
|
||||
|
||||
try:
|
||||
qi = instance.getQuotaInformation()
|
||||
except NotImplementedError:
|
||||
qi = QuotaInformation()
|
||||
used_quota.add(qi)
|
||||
|
||||
return used_quota
|
||||
|
||||
def cleanupNode(self, external_id):
|
||||
# TODO: This happens in a thread-per-node in the launcher
|
||||
# (above the driver level). If we want to make this
|
||||
# single-threaded (yes), we'll need to modify the launcher
|
||||
# itself.
|
||||
state_machine = self.adapter.getDeleteStateMachine(
|
||||
external_id)
|
||||
self.delete_state_machines[external_id] = state_machine
|
||||
|
||||
def waitForNodeCleanup(self, external_id, timeout=600):
|
||||
try:
|
||||
for count in iterate_timeout(
|
||||
timeout, exceptions.ServerDeleteException,
|
||||
"server %s deletion" % external_id):
|
||||
sm = self.delete_state_machines[external_id]
|
||||
sm.advance()
|
||||
self.log.debug(f"State machine for {external_id} at "
|
||||
f"{sm.state}")
|
||||
if sm.complete:
|
||||
return
|
||||
finally:
|
||||
self.delete_state_machines.pop(external_id, None)
|
||||
|
||||
def cleanupLeakedResources(self):
|
||||
known_nodes = set()
|
||||
|
||||
for node in self._zk.nodeIterator():
|
||||
if node.provider != self.provider.name:
|
||||
continue
|
||||
known_nodes.add(node.id)
|
||||
|
||||
metadata = {'nodepool_provider_name': self.provider.name}
|
||||
self.adapter.cleanupLeakedResources(known_nodes, metadata)
|
||||
|
||||
|
||||
# Driver implementation
|
||||
|
||||
class StateMachineDriver(Driver):
|
||||
"""Entrypoint for a state machine driver"""
|
||||
|
||||
def getProvider(self, provider_config):
|
||||
# Return a provider.
|
||||
# Usually this method does not need to be overridden.
|
||||
adapter = self.getAdapter(provider_config)
|
||||
return StateMachineProvider(adapter, provider_config)
|
||||
|
||||
# Public interface
|
||||
|
||||
def getProviderConfig(self, provider):
|
||||
"""Instantiate a config object
|
||||
|
||||
:param dict provider: A dictionary of YAML config describing
|
||||
the provider.
|
||||
:returns: A ProviderConfig instance with the parsed data.
|
||||
"""
|
||||
raise NotImplementedError()
|
||||
|
||||
def getAdapter(self, provider_config):
|
||||
"""Instantiate an adapter
|
||||
|
||||
:param ProviderConfig provider_config: An instance of
|
||||
ProviderConfig previously returned by :py:meth:`getProviderConfig`.
|
||||
:returns: An instance of :py:class:`SimpleTaskManagerAdapter`
|
||||
"""
|
||||
raise NotImplementedError()
|
||||
|
||||
|
||||
# Adapter public interface
|
||||
|
||||
class Instance:
|
||||
"""Represents a cloud instance
|
||||
|
||||
This class is used by the Simple Task Manager Driver classes to
|
||||
represent a standardized version of a remote cloud instance.
|
||||
Implement this class in your driver, override the :py:meth:`load`
|
||||
method, and supply as many of the fields as possible.
|
||||
|
||||
The following attributes are required:
|
||||
|
||||
* ready: bool (whether the instance is ready)
|
||||
* deleted: bool (whether the instance is in a deleted state)
|
||||
* external_id: str (the unique id of the instance)
|
||||
* interface_ip: str
|
||||
* metadata: dict
|
||||
|
||||
The following are optional:
|
||||
|
||||
* public_ipv4: str
|
||||
* public_ipv6: str
|
||||
* private_ipv4: str
|
||||
* az: str
|
||||
* region: str
|
||||
"""
|
||||
def __init__(self):
|
||||
self.ready = False
|
||||
self.deleted = False
|
||||
self.external_id = None
|
||||
self.public_ipv4 = None
|
||||
self.public_ipv6 = None
|
||||
self.private_ipv4 = None
|
||||
self.interface_ip = None
|
||||
self.az = None
|
||||
self.region = None
|
||||
self.metadata = {}
|
||||
|
||||
def __repr__(self):
|
||||
state = []
|
||||
if self.ready:
|
||||
state.append('ready')
|
||||
if self.deleted:
|
||||
state.append('deleted')
|
||||
state = ' '.join(state)
|
||||
return '<{klass} {external_id} {state}>'.format(
|
||||
klass=self.__class__.__name__,
|
||||
external_id=self.external_id,
|
||||
state=state)
|
||||
|
||||
def getQuotaInformation(self):
|
||||
"""Return quota information about this instance.
|
||||
|
||||
:returns: A :py:class:`QuotaInformation` object.
|
||||
"""
|
||||
raise NotImplementedError()
|
||||
|
||||
|
||||
class StateMachine:
|
||||
START = 'start'
|
||||
|
||||
def __init__(self):
|
||||
self.state = self.START
|
||||
self.external_id = None
|
||||
self.complete = False
|
||||
self.start_time = time.monotonic()
|
||||
|
||||
def advance(self):
|
||||
pass
|
||||
|
||||
|
||||
class Adapter:
|
||||
"""Cloud adapter for the State Machine Driver
|
||||
|
||||
This class will be instantiated once for each Nodepool provider.
|
||||
It may be discarded and replaced if the configuration changes.
|
||||
|
||||
You may establish a single long-lived connection to the cloud in
|
||||
the initializer if you wish.
|
||||
|
||||
:param ProviderConfig provider_config: A config object
|
||||
representing the provider.
|
||||
|
||||
"""
|
||||
def __init__(self, provider_config):
|
||||
pass
|
||||
|
||||
def getCreateStateMachine(self, hostname, label, metadata, retries):
|
||||
"""Return a state machine suitable for creating an instance
|
||||
|
||||
This method should return a new state machine object
|
||||
initialized to create the described node.
|
||||
|
||||
:param str hostname: The hostname of the node.
|
||||
:param ProviderLabel label: A config object representing the
|
||||
provider-label for the node.
|
||||
:param metadata dict: A dictionary of metadata that must be
|
||||
stored on the instance in the cloud. The same data must be
|
||||
able to be returned later on :py:class:`Instance` objects
|
||||
returned from `listInstances`.
|
||||
:param retries int: The number of attempts which should be
|
||||
made to launch the node.
|
||||
|
||||
:returns: A :py:class:`StateMachine` object.
|
||||
|
||||
"""
|
||||
raise NotImplementedError()
|
||||
|
||||
def getDeleteStateMachine(self, external_id):
|
||||
"""Return a state machine suitable for deleting an instance
|
||||
|
||||
This method should return a new state machine object
|
||||
initialized to delete the described instance.
|
||||
|
||||
:param str external_id: The external_id of the instance, as
|
||||
supplied by a creation StateMachine or an Instance.
|
||||
"""
|
||||
raise NotImplementedError()
|
||||
|
||||
def listInstances(self):
|
||||
"""Return an iterator of instances accessible to this provider.
|
||||
|
||||
The yielded values should represent all instances accessible
|
||||
to this provider, not only those under the control of this
|
||||
adapter, but all visible instances in order to achive accurate
|
||||
quota calculation.
|
||||
|
||||
:returns: A generator of :py:class:`Instance` objects.
|
||||
"""
|
||||
raise NotImplementedError()
|
||||
|
||||
def cleanupLeakedResources(self, known_nodes, metadata):
|
||||
"""Delete any leaked resources
|
||||
|
||||
Use the supplied metadata to delete any leaked resources.
|
||||
Typically you would list all of the instances in the provider,
|
||||
then list any other resources (floating IPs, etc). Any
|
||||
resources with matching metadata not associated with an
|
||||
instance should be deleted, and any instances with matching
|
||||
metadata and whose node id is not in known_nodes should also
|
||||
be deleted.
|
||||
|
||||
Look up the `nodepool_node_id` metadata attribute on cloud
|
||||
instances to compare to known_nodes.
|
||||
|
||||
:param set known_nodes: A set of string values representing
|
||||
known node ids.
|
||||
:param dict metadata: Metadata values to compare with cloud
|
||||
instances.
|
||||
|
||||
"""
|
||||
raise NotImplementedError()
|
||||
|
||||
def getQuotaLimits(self):
|
||||
"""Return the quota limits for this provider
|
||||
|
||||
The default implementation returns a simple QuotaInformation
|
||||
with no limits. Override this to provide accurate
|
||||
information.
|
||||
|
||||
:returns: A :py:class:`QuotaInformation` object.
|
||||
|
||||
"""
|
||||
return QuotaInformation(default=math.inf)
|
||||
|
||||
def getQuotaForLabel(self, label):
|
||||
"""Return information about the quota used for a label
|
||||
|
||||
The default implementation returns a simple QuotaInformation
|
||||
for one instance; override this to return more detailed
|
||||
information including cores and RAM.
|
||||
|
||||
:param ProviderLabel label: A config object describing
|
||||
a label for an instance.
|
||||
|
||||
:returns: A :py:class:`QuotaInformation` object.
|
||||
"""
|
||||
return QuotaInformation(instances=1)
|
@ -291,3 +291,37 @@ class QuotaSupport:
|
||||
self.log.exception("Couldn't consider invalid node %s "
|
||||
"for quota:" % node)
|
||||
return used_quota
|
||||
|
||||
|
||||
class RateLimiter:
|
||||
"""A Rate limiter
|
||||
|
||||
:param str name: The provider name; used in logging.
|
||||
:param float rate_limit: The rate limit expressed in
|
||||
requests per second.
|
||||
|
||||
Example:
|
||||
.. code:: python
|
||||
|
||||
rate_limiter = RateLimiter('provider', 1.0)
|
||||
with rate_limiter:
|
||||
api_call()
|
||||
"""
|
||||
|
||||
def __init__(self, name, rate_limit):
|
||||
self._running = True
|
||||
self.name = name
|
||||
self.delta = 1.0 / rate_limit
|
||||
self.last_ts = None
|
||||
|
||||
def __enter__(self):
|
||||
if self.last_ts is None:
|
||||
return
|
||||
while True:
|
||||
delta = time.monotonic() - self.last_ts
|
||||
if delta >= self.delta:
|
||||
break
|
||||
time.sleep(self.delta - delta)
|
||||
|
||||
def __exit__(self, etype, value, tb):
|
||||
self.last_ts = time.monotonic()
|
||||
|
Loading…
x
Reference in New Issue
Block a user