Replace shade and os-client-config with openstacksdk.
os-client-config is now just a wrapper around openstacksdk. The shade code has been imported into openstacksdk. To reduce complexity, just use openstacksdk directly. openstacksdk's TaskManager has had to grow some features to deal with SwiftService. Making nodepool's TaskManager a subclass of openstacksdk's TaskManager ensures that we get the thread pool set up properly. Change-Id: I3a01eb18ae31cc3b61509984f3817378db832b47
This commit is contained in:
parent
b8aa756515
commit
fc1f80b6d1
@ -230,7 +230,7 @@ def openConfig(path):
|
||||
def loadConfig(config_path):
|
||||
config = openConfig(config_path)
|
||||
|
||||
# Call driver config reset now to clean global hooks like os_client_config
|
||||
# Call driver config reset now to clean global hooks like openstacksdk
|
||||
for driver in Drivers.drivers.values():
|
||||
driver.reset()
|
||||
|
||||
|
@ -12,7 +12,7 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import os_client_config
|
||||
from openstack.config import loader
|
||||
|
||||
from nodepool.driver import Driver
|
||||
from nodepool.driver.fake.config import FakeProviderConfig
|
||||
@ -25,7 +25,7 @@ class FakeDriver(Driver):
|
||||
self.reset()
|
||||
|
||||
def reset(self):
|
||||
self.os_client_config = os_client_config.OpenStackConfig()
|
||||
self.openstack_config = loader.OpenStackConfig()
|
||||
|
||||
def getProviderConfig(self, provider):
|
||||
return FakeProviderConfig(self, provider)
|
||||
|
@ -19,7 +19,7 @@ import threading
|
||||
import time
|
||||
import uuid
|
||||
|
||||
import shade
|
||||
import openstack
|
||||
|
||||
from nodepool import exceptions
|
||||
from nodepool.driver.openstack.provider import OpenStackProvider
|
||||
@ -39,11 +39,11 @@ class Dummy(object):
|
||||
setattr(self, k, v)
|
||||
try:
|
||||
if self.should_fail:
|
||||
raise shade.OpenStackCloudException('This image has '
|
||||
'SHOULD_FAIL set to True.')
|
||||
raise openstack.exceptions.OpenStackCloudException(
|
||||
'This image has SHOULD_FAIL set to True.')
|
||||
if self.over_quota:
|
||||
raise shade.exc.OpenStackCloudHTTPError(
|
||||
'Quota exceeded for something', 403)
|
||||
raise openstack.exceptions.HttpException(
|
||||
message='Quota exceeded for something', http_status=403)
|
||||
except AttributeError:
|
||||
pass
|
||||
|
||||
|
@ -12,7 +12,7 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import os_client_config
|
||||
from openstack.config import loader
|
||||
|
||||
from nodepool.driver import Driver
|
||||
from nodepool.driver.openstack.config import OpenStackProviderConfig
|
||||
@ -25,7 +25,7 @@ class OpenStackDriver(Driver):
|
||||
self.reset()
|
||||
|
||||
def reset(self):
|
||||
self.os_client_config = os_client_config.OpenStackConfig()
|
||||
self.openstack_config = loader.OpenStackConfig()
|
||||
|
||||
def getProviderConfig(self, provider):
|
||||
return OpenStackProviderConfig(self, provider)
|
||||
|
@ -195,8 +195,8 @@ class OpenStackProviderConfig(ProviderConfig):
|
||||
|
||||
def load(self, config):
|
||||
cloud_kwargs = self._cloudKwargs()
|
||||
occ = self.driver_object.os_client_config
|
||||
self.cloud_config = occ.get_one_cloud(**cloud_kwargs)
|
||||
openstack_config = self.driver_object.openstack_config
|
||||
self.cloud_config = openstack_config.get_one(**cloud_kwargs)
|
||||
|
||||
self.image_type = self.cloud_config.config['image_format']
|
||||
self.region_name = self.provider.get('region-name')
|
||||
|
@ -19,7 +19,7 @@ import logging
|
||||
import operator
|
||||
import time
|
||||
|
||||
import shade
|
||||
import openstack
|
||||
|
||||
from nodepool import exceptions
|
||||
from nodepool.driver import Provider
|
||||
@ -43,7 +43,7 @@ class OpenStackProvider(Provider):
|
||||
self.provider = provider
|
||||
self._images = {}
|
||||
self._networks = {}
|
||||
self.__flavors = {}
|
||||
self.__flavors = {} # TODO(gtema): caching
|
||||
self.__azs = None
|
||||
self._use_taskmanager = use_taskmanager
|
||||
self._taskmanager = None
|
||||
@ -51,7 +51,7 @@ class OpenStackProvider(Provider):
|
||||
|
||||
def start(self, zk_conn):
|
||||
if self._use_taskmanager:
|
||||
self._taskmanager = TaskManager(None, self.provider.name,
|
||||
self._taskmanager = TaskManager(self.provider.name,
|
||||
self.provider.rate)
|
||||
self._taskmanager.start()
|
||||
self.resetClient()
|
||||
@ -67,6 +67,7 @@ class OpenStackProvider(Provider):
|
||||
def getRequestHandler(self, poolworker, request):
|
||||
return handler.OpenStackNodeRequestHandler(poolworker, request)
|
||||
|
||||
# TODO(gtema): caching
|
||||
@property
|
||||
def _flavors(self):
|
||||
if not self.__flavors:
|
||||
@ -78,12 +79,12 @@ class OpenStackProvider(Provider):
|
||||
manager = self._taskmanager
|
||||
else:
|
||||
manager = None
|
||||
return shade.OpenStackCloud(
|
||||
cloud_config=self.provider.cloud_config,
|
||||
manager=manager,
|
||||
return openstack.connection.Connection(
|
||||
config=self.provider.cloud_config,
|
||||
task_manager=manager,
|
||||
app_name='nodepool',
|
||||
app_version=version.version_info.version_string(),
|
||||
**self.provider.cloud_config.config)
|
||||
app_version=version.version_info.version_string()
|
||||
)
|
||||
|
||||
def quotaNeededByNodeType(self, ntype, pool):
|
||||
provider_label = pool.labels[ntype]
|
||||
@ -196,19 +197,15 @@ class OpenStackProvider(Provider):
|
||||
|
||||
def resetClient(self):
|
||||
self._client = self._getClient()
|
||||
if self._use_taskmanager:
|
||||
self._taskmanager.setClient(self._client)
|
||||
|
||||
def _getFlavors(self):
|
||||
flavors = self.listFlavors()
|
||||
flavors.sort(key=operator.itemgetter('ram'))
|
||||
return flavors
|
||||
|
||||
# TODO(mordred): These next three methods duplicate logic that is in
|
||||
# shade, but we can't defer to shade until we're happy
|
||||
# with using shade's resource caching facility. We have
|
||||
# not yet proven that to our satisfaction, but if/when
|
||||
# we do, these should be able to go away.
|
||||
# TODO(gtema): These next three methods duplicate logic that is in
|
||||
# openstacksdk, caching is not enabled there by default
|
||||
# Remove it when caching is default
|
||||
def _findFlavorByName(self, flavor_name):
|
||||
for f in self._flavors:
|
||||
if flavor_name in (f['name'], f['id']):
|
||||
@ -226,6 +223,16 @@ class OpenStackProvider(Provider):
|
||||
# Note: this will throw an error if the provider is offline
|
||||
# but all the callers are in threads (they call in via CreateServer) so
|
||||
# the mainloop won't be affected.
|
||||
# TODO(gtema): enable commented block when openstacksdk has caching
|
||||
# enabled by default
|
||||
# if min_ram:
|
||||
# return self._client.get_flavor_by_ram(
|
||||
# ram=min_ram,
|
||||
# include=flavor_name,
|
||||
# get_extra=False)
|
||||
# else:
|
||||
# return self._client.get_flavor(flavor_name, get_extra=False)
|
||||
|
||||
if min_ram:
|
||||
return self._findFlavorByRam(min_ram, flavor_name)
|
||||
else:
|
||||
@ -314,14 +321,14 @@ class OpenStackProvider(Provider):
|
||||
|
||||
try:
|
||||
return self._client.create_server(wait=False, **create_args)
|
||||
except shade.OpenStackCloudBadRequest:
|
||||
except openstack.exceptions.BadRequestException:
|
||||
# We've gotten a 400 error from nova - which means the request
|
||||
# was malformed. The most likely cause of that, unless something
|
||||
# became functionally and systemically broken, is stale image
|
||||
# or flavor cache. Log a message, invalidate the caches so that
|
||||
# next time we get new caches.
|
||||
self._images = {}
|
||||
self.__flavors = {}
|
||||
self.__flavors = {} # TODO(gtema): caching
|
||||
self.log.info(
|
||||
"Clearing flavor and image caches due to 400 error from nova")
|
||||
raise
|
||||
@ -332,7 +339,7 @@ class OpenStackProvider(Provider):
|
||||
def getServerConsole(self, server_id):
|
||||
try:
|
||||
return self._client.get_server_console(server_id)
|
||||
except shade.OpenStackCloudException:
|
||||
except openstack.exceptions.OpenStackCloudException:
|
||||
return None
|
||||
|
||||
def waitForServer(self, server, timeout=3600, auto_ip=True):
|
||||
|
@ -46,7 +46,7 @@ _DEFAULT_SERVER_LOGGING_CONFIG = {
|
||||
'handlers': ['console'],
|
||||
'level': 'WARN',
|
||||
},
|
||||
'shade': {
|
||||
'openstack': {
|
||||
'handlers': ['console'],
|
||||
'level': 'WARN',
|
||||
},
|
||||
|
@ -21,6 +21,8 @@ import logging
|
||||
import queue
|
||||
import time
|
||||
|
||||
from openstack import task_manager as openstack_task_manager
|
||||
|
||||
from nodepool import stats
|
||||
|
||||
|
||||
@ -28,24 +30,19 @@ class ManagerStoppedException(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class TaskManager(object):
|
||||
class TaskManager(openstack_task_manager.TaskManager):
|
||||
log = logging.getLogger("nodepool.TaskManager")
|
||||
|
||||
def __init__(self, client, name, rate):
|
||||
super(TaskManager, self).__init__()
|
||||
def __init__(self, name, rate, workers=5):
|
||||
super(TaskManager, self).__init__(name=name, workers=workers)
|
||||
self.daemon = True
|
||||
self.queue = queue.Queue()
|
||||
self._running = True
|
||||
self.name = name
|
||||
self.rate = float(rate)
|
||||
self._client = None
|
||||
self.statsd = stats.get_client()
|
||||
self._thread = threading.Thread(name=name, target=self.run)
|
||||
self._thread.daemon = True
|
||||
|
||||
def setClient(self, client):
|
||||
self._client = client
|
||||
|
||||
def start(self):
|
||||
self._thread.start()
|
||||
|
||||
@ -70,33 +67,25 @@ class TaskManager(object):
|
||||
if delta >= self.rate:
|
||||
break
|
||||
time.sleep(self.rate - delta)
|
||||
self.log.debug("Manager %s running task %s (queue: %s)" %
|
||||
(self.name, type(task).__name__,
|
||||
self.queue.qsize()))
|
||||
start = time.time()
|
||||
self.runTask(task)
|
||||
last_ts = time.time()
|
||||
dt = last_ts - start
|
||||
self.log.debug("Manager %s ran task %s in %ss" %
|
||||
(self.name, type(task).__name__, dt))
|
||||
if self.statsd:
|
||||
# nodepool.task.PROVIDER.subkey
|
||||
subkey = type(task).__name__
|
||||
key = 'nodepool.task.%s.%s' % (self.name, subkey)
|
||||
self.statsd.timing(key, int(dt * 1000))
|
||||
self.statsd.incr(key)
|
||||
|
||||
self.log.debug("Manager %s running task %s (queue %s)" %
|
||||
(self.name, task.name, self.queue.qsize()))
|
||||
self.run_task(task)
|
||||
self.queue.task_done()
|
||||
except Exception:
|
||||
self.log.exception("Task manager died.")
|
||||
raise
|
||||
|
||||
def submitTask(self, task):
|
||||
def post_run_task(self, elapsed_time, task):
|
||||
super(TaskManager, self).post_run_task(elapsed_time, task)
|
||||
if self.statsd:
|
||||
# nodepool.task.PROVIDER.TASK_NAME
|
||||
key = 'nodepool.task.%s.%s' % (self.name, task.name)
|
||||
self.statsd.timing(key, int(elapsed_time * 1000))
|
||||
self.statsd.incr(key)
|
||||
|
||||
def submit_task(self, task, raw=False):
|
||||
if not self._running:
|
||||
raise ManagerStoppedException(
|
||||
"Manager %s is no longer running" % self.name)
|
||||
self.queue.put(task)
|
||||
return task.wait()
|
||||
|
||||
def runTask(self, task):
|
||||
task.run(self._client)
|
||||
|
@ -7,8 +7,7 @@ extras
|
||||
statsd>=3.0
|
||||
sqlalchemy>=0.8.2,<1.1.0
|
||||
PrettyTable>=0.6,<0.8
|
||||
os-client-config>=1.2.0
|
||||
shade>=1.21.0
|
||||
openstacksdk>=0.16.0
|
||||
diskimage-builder>=2.0.0
|
||||
voluptuous
|
||||
kazoo
|
||||
|
Loading…
x
Reference in New Issue
Block a user