Merge "Fix RPC Versioning"
This commit is contained in:
commit
0d514081c9
@ -16,13 +16,13 @@ from oslo_concurrency import processutils
|
||||
from oslo_service import service as openstack_service
|
||||
|
||||
from trove.cmd.common import with_initialize
|
||||
from trove.conductor import api as conductor_api
|
||||
|
||||
|
||||
@with_initialize
|
||||
def main(conf):
|
||||
from trove.common import notification
|
||||
from trove.common.rpc import service as rpc_service
|
||||
from trove.common.rpc import version as rpc_version
|
||||
from trove.instance import models as inst_models
|
||||
|
||||
notification.DBaaSAPINotification.register_notify_callback(
|
||||
@ -30,7 +30,7 @@ def main(conf):
|
||||
topic = conf.conductor_queue
|
||||
server = rpc_service.RpcService(
|
||||
manager=conf.conductor_manager, topic=topic,
|
||||
rpc_api_version=rpc_version.RPC_API_VERSION)
|
||||
rpc_api_version=conductor_api.API.API_LATEST_VERSION)
|
||||
workers = conf.trove_conductor_workers or processutils.get_worker_count()
|
||||
launcher = openstack_service.launch(conf, server, workers=workers)
|
||||
launcher.wait()
|
||||
|
@ -25,6 +25,7 @@ from oslo_service import service as openstack_service
|
||||
from trove.common import cfg
|
||||
from trove.common import debug_utils
|
||||
from trove.common.i18n import _LE
|
||||
from trove.guestagent import api as guest_api
|
||||
|
||||
CONF = cfg.CONF
|
||||
# The guest_id opt definition must match the one in common/cfg.py
|
||||
@ -57,11 +58,10 @@ def main():
|
||||
rpc.init(CONF)
|
||||
|
||||
from trove.common.rpc import service as rpc_service
|
||||
from trove.common.rpc import version as rpc_version
|
||||
server = rpc_service.RpcService(
|
||||
topic="guestagent.%s" % CONF.guest_id,
|
||||
manager=manager, host=CONF.guest_id,
|
||||
rpc_api_version=rpc_version.RPC_API_VERSION)
|
||||
rpc_api_version=guest_api.API.API_LATEST_VERSION)
|
||||
|
||||
launcher = openstack_service.launch(CONF, server)
|
||||
launcher.wait()
|
||||
|
@ -16,6 +16,7 @@ from oslo_config import cfg as openstack_cfg
|
||||
from oslo_service import service as openstack_service
|
||||
|
||||
from trove.cmd.common import with_initialize
|
||||
from trove.taskmanager import api as task_api
|
||||
|
||||
|
||||
extra_opts = [openstack_cfg.StrOpt('taskmanager_manager')]
|
||||
@ -24,14 +25,13 @@ extra_opts = [openstack_cfg.StrOpt('taskmanager_manager')]
|
||||
def startup(conf, topic):
|
||||
from trove.common import notification
|
||||
from trove.common.rpc import service as rpc_service
|
||||
from trove.common.rpc import version as rpc_version
|
||||
from trove.instance import models as inst_models
|
||||
|
||||
notification.DBaaSAPINotification.register_notify_callback(
|
||||
inst_models.persist_instance_fault)
|
||||
server = rpc_service.RpcService(
|
||||
manager=conf.taskmanager_manager, topic=topic,
|
||||
rpc_api_version=rpc_version.RPC_API_VERSION)
|
||||
rpc_api_version=task_api.API.API_LATEST_VERSION)
|
||||
launcher = openstack_service.launch(conf, server)
|
||||
launcher.wait()
|
||||
|
||||
|
@ -1449,18 +1449,19 @@ mariadb_opts = [
|
||||
upgrade_levels = cfg.OptGroup(
|
||||
'upgrade_levels',
|
||||
title='RPC upgrade levels group for handling versions',
|
||||
help='Contains the support version caps for each RPC API')
|
||||
help='Contains the support version caps (Openstack Release) for '
|
||||
'each RPC API')
|
||||
|
||||
rpcapi_cap_opts = [
|
||||
cfg.StrOpt(
|
||||
'taskmanager', default="icehouse",
|
||||
'taskmanager', default='latest',
|
||||
help='Set a version cap for messages sent to taskmanager services'),
|
||||
cfg.StrOpt(
|
||||
'guestagent', default="icehouse",
|
||||
'guestagent', default='latest',
|
||||
help='Set a version cap for messages sent to guestagent services'),
|
||||
cfg.StrOpt(
|
||||
'conductor', default="icehouse",
|
||||
help='Set a version cap for messages sent to conductor services'),
|
||||
'conductor', default='latest',
|
||||
help='Set Openstack Release compatibility for conductor services'),
|
||||
]
|
||||
|
||||
CONF = cfg.CONF
|
||||
|
@ -30,65 +30,100 @@ class CassandraGuestAgentStrategy(base.BaseGuestAgentStrategy):
|
||||
|
||||
|
||||
class CassandraGuestAgentAPI(guest_api.API):
|
||||
"""Cluster Specific Datastore Guest API
|
||||
|
||||
**** VERSION CONTROLLED API ****
|
||||
|
||||
The methods in this class are subject to version control as
|
||||
coordinated by guestagent/api.py. Whenever a change is made to
|
||||
any API method in this class, add a version number and comment
|
||||
to the top of guestagent/api.py and use the version number as
|
||||
appropriate in this file
|
||||
"""
|
||||
|
||||
def get_data_center(self):
|
||||
LOG.debug("Retrieving the data center for node: %s" % self.id)
|
||||
version = guest_api.API.API_BASE_VERSION
|
||||
|
||||
return self._call("get_data_center", guest_api.AGENT_LOW_TIMEOUT,
|
||||
self.version_cap)
|
||||
version=version)
|
||||
|
||||
def get_rack(self):
|
||||
LOG.debug("Retrieving the rack for node: %s" % self.id)
|
||||
version = guest_api.API.API_BASE_VERSION
|
||||
|
||||
return self._call("get_rack", guest_api.AGENT_LOW_TIMEOUT,
|
||||
self.version_cap)
|
||||
version=version)
|
||||
|
||||
def set_seeds(self, seeds):
|
||||
LOG.debug("Configuring the gossip seeds for node: %s" % self.id)
|
||||
version = guest_api.API.API_BASE_VERSION
|
||||
|
||||
return self._call("set_seeds", guest_api.AGENT_LOW_TIMEOUT,
|
||||
self.version_cap, seeds=seeds)
|
||||
version=version, seeds=seeds)
|
||||
|
||||
def get_seeds(self):
|
||||
LOG.debug("Retrieving the gossip seeds for node: %s" % self.id)
|
||||
version = guest_api.API.API_BASE_VERSION
|
||||
|
||||
return self._call("get_seeds", guest_api.AGENT_LOW_TIMEOUT,
|
||||
self.version_cap)
|
||||
version=version)
|
||||
|
||||
def set_auto_bootstrap(self, enabled):
|
||||
LOG.debug("Setting the auto-bootstrap to '%s' for node: %s"
|
||||
% (enabled, self.id))
|
||||
version = guest_api.API.API_BASE_VERSION
|
||||
|
||||
return self._call("set_auto_bootstrap", guest_api.AGENT_LOW_TIMEOUT,
|
||||
self.version_cap, enabled=enabled)
|
||||
version=version, enabled=enabled)
|
||||
|
||||
def cluster_complete(self):
|
||||
LOG.debug("Sending a setup completion notification for node: %s"
|
||||
% self.id)
|
||||
version = guest_api.API.API_BASE_VERSION
|
||||
|
||||
return self._call("cluster_complete", guest_api.AGENT_HIGH_TIMEOUT,
|
||||
self.version_cap)
|
||||
version=version)
|
||||
|
||||
def node_cleanup_begin(self):
|
||||
LOG.debug("Signaling the node to prepare for cleanup: %s" % self.id)
|
||||
version = guest_api.API.API_BASE_VERSION
|
||||
|
||||
return self._call("node_cleanup_begin", guest_api.AGENT_LOW_TIMEOUT,
|
||||
self.version_cap)
|
||||
version=version)
|
||||
|
||||
def node_cleanup(self):
|
||||
LOG.debug("Running cleanup on node: %s" % self.id)
|
||||
return self._cast('node_cleanup', self.version_cap)
|
||||
version = guest_api.API.API_BASE_VERSION
|
||||
|
||||
return self._cast('node_cleanup', version=version)
|
||||
|
||||
def node_decommission(self):
|
||||
LOG.debug("Decommission node: %s" % self.id)
|
||||
return self._cast("node_decommission", self.version_cap)
|
||||
version = guest_api.API.API_BASE_VERSION
|
||||
|
||||
return self._cast("node_decommission", version=version)
|
||||
|
||||
def cluster_secure(self, password):
|
||||
LOG.debug("Securing the cluster via node: %s" % self.id)
|
||||
version = guest_api.API.API_BASE_VERSION
|
||||
|
||||
return self._call(
|
||||
"cluster_secure", guest_api.AGENT_HIGH_TIMEOUT,
|
||||
self.version_cap, password=password)
|
||||
version=version, password=password)
|
||||
|
||||
def get_admin_credentials(self):
|
||||
LOG.debug("Retrieving the admin credentials from node: %s" % self.id)
|
||||
version = guest_api.API.API_BASE_VERSION
|
||||
|
||||
return self._call("get_admin_credentials", guest_api.AGENT_LOW_TIMEOUT,
|
||||
self.version_cap)
|
||||
version=version)
|
||||
|
||||
def store_admin_credentials(self, admin_credentials):
|
||||
LOG.debug("Storing the admin credentials on node: %s" % self.id)
|
||||
version = guest_api.API.API_BASE_VERSION
|
||||
|
||||
return self._call("store_admin_credentials",
|
||||
guest_api.AGENT_LOW_TIMEOUT, self.version_cap,
|
||||
guest_api.AGENT_LOW_TIMEOUT,
|
||||
version=version,
|
||||
admin_credentials=admin_credentials)
|
||||
|
@ -31,39 +31,59 @@ class GaleraCommonGuestAgentStrategy(cluster_base.BaseGuestAgentStrategy):
|
||||
|
||||
|
||||
class GaleraCommonGuestAgentAPI(guest_api.API):
|
||||
"""Cluster Specific Datastore Guest API
|
||||
|
||||
**** VERSION CONTROLLED API ****
|
||||
|
||||
The methods in this class are subject to version control as
|
||||
coordinated by guestagent/api.py. Whenever a change is made to
|
||||
any API method in this class, add a version number and comment
|
||||
to the top of guestagent/api.py and use the version number as
|
||||
appropriate in this file
|
||||
"""
|
||||
|
||||
def install_cluster(self, replication_user, cluster_configuration,
|
||||
bootstrap):
|
||||
"""Install the cluster."""
|
||||
LOG.debug("Installing Galera cluster.")
|
||||
version = guest_api.API.API_BASE_VERSION
|
||||
|
||||
self._call("install_cluster", CONF.cluster_usage_timeout,
|
||||
self.version_cap,
|
||||
version=version,
|
||||
replication_user=replication_user,
|
||||
cluster_configuration=cluster_configuration,
|
||||
bootstrap=bootstrap)
|
||||
|
||||
def reset_admin_password(self, admin_password):
|
||||
"""Store this password on the instance as the admin password."""
|
||||
version = guest_api.API.API_BASE_VERSION
|
||||
|
||||
self._call("reset_admin_password", CONF.cluster_usage_timeout,
|
||||
self.version_cap,
|
||||
version=version,
|
||||
admin_password=admin_password)
|
||||
|
||||
def cluster_complete(self):
|
||||
"""Set the status that the cluster is build is complete."""
|
||||
LOG.debug("Notifying cluster install completion.")
|
||||
version = guest_api.API.API_BASE_VERSION
|
||||
|
||||
return self._call("cluster_complete", guest_api.AGENT_HIGH_TIMEOUT,
|
||||
self.version_cap)
|
||||
version=version)
|
||||
|
||||
def get_cluster_context(self):
|
||||
"""Get the context of the cluster."""
|
||||
LOG.debug("Getting the cluster context.")
|
||||
version = guest_api.API.API_BASE_VERSION
|
||||
|
||||
return self._call("get_cluster_context", guest_api.AGENT_HIGH_TIMEOUT,
|
||||
self.version_cap)
|
||||
version=version)
|
||||
|
||||
def write_cluster_configuration_overrides(self, cluster_configuration):
|
||||
"""Write an updated the cluster configuration."""
|
||||
LOG.debug("Writing an updated the cluster configuration.")
|
||||
version = guest_api.API.API_BASE_VERSION
|
||||
|
||||
self._call("write_cluster_configuration_overrides",
|
||||
guest_api.AGENT_HIGH_TIMEOUT,
|
||||
self.version_cap,
|
||||
version=version,
|
||||
cluster_configuration=cluster_configuration)
|
||||
|
@ -33,6 +33,16 @@ class MongoDbGuestAgentStrategy(base.BaseGuestAgentStrategy):
|
||||
|
||||
|
||||
class MongoDbGuestAgentAPI(guest_api.API):
|
||||
"""Cluster Specific Datastore Guest API
|
||||
|
||||
**** VERSION CONTROLLED API ****
|
||||
|
||||
The methods in this class are subject to version control as
|
||||
coordinated by guestagent/api.py. Whenever a change is made to
|
||||
any API method in this class, add a version number and comment
|
||||
to the top of guestagent/api.py and use the version number as
|
||||
appropriate in this file
|
||||
"""
|
||||
|
||||
def add_shard(self, replica_set_name, replica_set_member):
|
||||
LOG.debug("Adding shard with replSet %(replica_set_name)s and member "
|
||||
@ -40,66 +50,89 @@ class MongoDbGuestAgentAPI(guest_api.API):
|
||||
"%(id)s" % {'replica_set_name': replica_set_name,
|
||||
'replica_set_member': replica_set_member,
|
||||
'id': self.id})
|
||||
version = guest_api.API.API_BASE_VERSION
|
||||
|
||||
return self._call("add_shard", guest_api.AGENT_HIGH_TIMEOUT,
|
||||
self.version_cap,
|
||||
version=version,
|
||||
replica_set_name=replica_set_name,
|
||||
replica_set_member=replica_set_member)
|
||||
|
||||
def add_members(self, members):
|
||||
LOG.debug("Adding members %(members)s on instance %(id)s" % {
|
||||
'members': members, 'id': self.id})
|
||||
version = guest_api.API.API_BASE_VERSION
|
||||
|
||||
return self._call("add_members", ADD_MEMBERS_TIMEOUT,
|
||||
self.version_cap, members=members)
|
||||
version=version, members=members)
|
||||
|
||||
def add_config_servers(self, config_servers):
|
||||
LOG.debug("Adding config servers %(config_servers)s for instance "
|
||||
"%(id)s" % {'config_servers': config_servers,
|
||||
'id': self.id})
|
||||
version = guest_api.API.API_BASE_VERSION
|
||||
|
||||
return self._call("add_config_servers", guest_api.AGENT_HIGH_TIMEOUT,
|
||||
self.version_cap, config_servers=config_servers)
|
||||
version=version,
|
||||
config_servers=config_servers)
|
||||
|
||||
def cluster_complete(self):
|
||||
LOG.debug("Notify regarding cluster install completion")
|
||||
version = guest_api.API.API_BASE_VERSION
|
||||
|
||||
return self._call("cluster_complete", guest_api.AGENT_HIGH_TIMEOUT,
|
||||
self.version_cap)
|
||||
version=version)
|
||||
|
||||
def get_key(self):
|
||||
LOG.debug("Requesting cluster key from guest")
|
||||
version = guest_api.API.API_BASE_VERSION
|
||||
|
||||
return self._call("get_key", guest_api.AGENT_LOW_TIMEOUT,
|
||||
self.version_cap)
|
||||
version=version)
|
||||
|
||||
def prep_primary(self):
|
||||
LOG.debug("Preparing member to be primary member.")
|
||||
version = guest_api.API.API_BASE_VERSION
|
||||
|
||||
return self._call("prep_primary", guest_api.AGENT_HIGH_TIMEOUT,
|
||||
self.version_cap)
|
||||
version=version)
|
||||
|
||||
def create_admin_user(self, password):
|
||||
LOG.debug("Creating admin user")
|
||||
version = guest_api.API.API_BASE_VERSION
|
||||
|
||||
return self._call("create_admin_user", guest_api.AGENT_HIGH_TIMEOUT,
|
||||
self.version_cap, password=password)
|
||||
version=version, password=password)
|
||||
|
||||
def store_admin_password(self, password):
|
||||
LOG.debug("Storing admin password")
|
||||
version = guest_api.API.API_BASE_VERSION
|
||||
|
||||
return self._call("store_admin_password",
|
||||
guest_api.AGENT_LOW_TIMEOUT,
|
||||
self.version_cap,
|
||||
version=version,
|
||||
password=password)
|
||||
|
||||
def get_replica_set_name(self):
|
||||
LOG.debug("Querying member for its replica set name")
|
||||
version = guest_api.API.API_BASE_VERSION
|
||||
|
||||
return self._call("get_replica_set_name",
|
||||
guest_api.AGENT_HIGH_TIMEOUT,
|
||||
self.version_cap)
|
||||
version=version)
|
||||
|
||||
def get_admin_password(self):
|
||||
LOG.debug("Querying instance for its admin password")
|
||||
version = guest_api.API.API_BASE_VERSION
|
||||
|
||||
return self._call("get_admin_password",
|
||||
guest_api.AGENT_LOW_TIMEOUT,
|
||||
self.version_cap)
|
||||
version=version)
|
||||
|
||||
def is_shard_active(self, replica_set_name):
|
||||
LOG.debug("Checking if replica set %s is active" % replica_set_name)
|
||||
version = guest_api.API.API_BASE_VERSION
|
||||
|
||||
return self._call("is_shard_active",
|
||||
guest_api.AGENT_HIGH_TIMEOUT,
|
||||
self.version_cap,
|
||||
version=version,
|
||||
replica_set_name=replica_set_name)
|
||||
|
@ -379,7 +379,8 @@ class MongoDbTaskManagerAPI(task_api.API):
|
||||
def mongodb_add_shard_cluster(self, cluster_id, shard_id,
|
||||
replica_set_name):
|
||||
LOG.debug("Making async call to add shard cluster %s " % cluster_id)
|
||||
cctxt = self.client.prepare(version=self.version_cap)
|
||||
version = task_api.API.API_BASE_VERSION
|
||||
cctxt = self.client.prepare(version=version)
|
||||
cctxt.cast(self.context,
|
||||
"add_shard_cluster",
|
||||
cluster_id=cluster_id,
|
||||
|
@ -28,34 +28,59 @@ class RedisGuestAgentStrategy(base.BaseGuestAgentStrategy):
|
||||
|
||||
|
||||
class RedisGuestAgentAPI(guest_api.API):
|
||||
"""Cluster Specific Datastore Guest API
|
||||
|
||||
**** VERSION CONTROLLED API ****
|
||||
|
||||
The methods in this class are subject to version control as
|
||||
coordinated by guestagent/api.py. Whenever a change is made to
|
||||
any API method in this class, add a version number and comment
|
||||
to the top of guestagent/api.py and use the version number as
|
||||
appropriate in this file
|
||||
"""
|
||||
|
||||
def get_node_ip(self):
|
||||
LOG.debug("Retrieve ip info from node.")
|
||||
version = guest_api.API.API_BASE_VERSION
|
||||
|
||||
return self._call("get_node_ip",
|
||||
guest_api.AGENT_HIGH_TIMEOUT, self.version_cap)
|
||||
guest_api.AGENT_HIGH_TIMEOUT,
|
||||
version=version)
|
||||
|
||||
def get_node_id_for_removal(self):
|
||||
LOG.debug("Validating cluster node removal.")
|
||||
version = guest_api.API.API_BASE_VERSION
|
||||
|
||||
return self._call("get_node_id_for_removal",
|
||||
guest_api.AGENT_HIGH_TIMEOUT, self.version_cap)
|
||||
guest_api.AGENT_HIGH_TIMEOUT,
|
||||
version=version)
|
||||
|
||||
def remove_nodes(self, node_ids):
|
||||
LOG.debug("Removing nodes from cluster.")
|
||||
version = guest_api.API.API_BASE_VERSION
|
||||
|
||||
return self._call("remove_nodes", guest_api.AGENT_HIGH_TIMEOUT,
|
||||
self.version_cap, node_ids=node_ids)
|
||||
version=version, node_ids=node_ids)
|
||||
|
||||
def cluster_meet(self, ip, port):
|
||||
LOG.debug("Joining node to cluster.")
|
||||
version = guest_api.API.API_BASE_VERSION
|
||||
|
||||
return self._call("cluster_meet", guest_api.AGENT_HIGH_TIMEOUT,
|
||||
self.version_cap, ip=ip, port=port)
|
||||
version=version, ip=ip, port=port)
|
||||
|
||||
def cluster_addslots(self, first_slot, last_slot):
|
||||
LOG.debug("Adding slots %s-%s to cluster.", first_slot, last_slot)
|
||||
version = guest_api.API.API_BASE_VERSION
|
||||
|
||||
return self._call("cluster_addslots",
|
||||
guest_api.AGENT_HIGH_TIMEOUT, self.version_cap,
|
||||
guest_api.AGENT_HIGH_TIMEOUT,
|
||||
version=version,
|
||||
first_slot=first_slot, last_slot=last_slot)
|
||||
|
||||
def cluster_complete(self):
|
||||
LOG.debug("Notifying cluster install completion.")
|
||||
version = guest_api.API.API_BASE_VERSION
|
||||
|
||||
return self._call("cluster_complete", guest_api.AGENT_HIGH_TIMEOUT,
|
||||
self.version_cap)
|
||||
version=version)
|
||||
|
@ -30,39 +30,64 @@ class VerticaGuestAgentStrategy(base.BaseGuestAgentStrategy):
|
||||
|
||||
|
||||
class VerticaGuestAgentAPI(guest_api.API):
|
||||
"""Cluster Specific Datastore Guest API
|
||||
|
||||
**** VERSION CONTROLLED API ****
|
||||
|
||||
The methods in this class are subject to version control as
|
||||
coordinated by guestagent/api.py. Whenever a change is made to
|
||||
any API method in this class, add a version number and comment
|
||||
to the top of guestagent/api.py and use the version number as
|
||||
appropriate in this file
|
||||
"""
|
||||
|
||||
def get_public_keys(self, user):
|
||||
LOG.debug("Getting public keys for user: %s." % user)
|
||||
version = guest_api.API.API_BASE_VERSION
|
||||
|
||||
return self._call("get_public_keys", guest_api.AGENT_HIGH_TIMEOUT,
|
||||
self.version_cap, user=user)
|
||||
version=version, user=user)
|
||||
|
||||
def authorize_public_keys(self, user, public_keys):
|
||||
LOG.debug("Authorizing public keys for user: %s." % user)
|
||||
version = guest_api.API.API_BASE_VERSION
|
||||
|
||||
return self._call("authorize_public_keys",
|
||||
guest_api.AGENT_HIGH_TIMEOUT, self.version_cap,
|
||||
guest_api.AGENT_HIGH_TIMEOUT,
|
||||
version=version,
|
||||
user=user, public_keys=public_keys)
|
||||
|
||||
def install_cluster(self, members):
|
||||
LOG.debug("Installing Vertica cluster on members: %s." % members)
|
||||
version = guest_api.API.API_BASE_VERSION
|
||||
|
||||
return self._call("install_cluster", CONF.cluster_usage_timeout,
|
||||
self.version_cap, members=members)
|
||||
version=version, members=members)
|
||||
|
||||
def grow_cluster(self, members):
|
||||
LOG.debug("Growing Vertica cluster with members: %s." % members)
|
||||
version = guest_api.API.API_BASE_VERSION
|
||||
|
||||
return self._call("grow_cluster", CONF.cluster_usage_timeout,
|
||||
self.version_cap, members=members)
|
||||
version=version, members=members)
|
||||
|
||||
def shrink_cluster(self, members):
|
||||
LOG.debug("Shrinking Vertica cluster with members: %s." % members)
|
||||
version = guest_api.API.API_BASE_VERSION
|
||||
|
||||
return self._call("shrink_cluster", CONF.cluster_usage_timeout,
|
||||
self.version_cap, members=members)
|
||||
version=version, members=members)
|
||||
|
||||
def mark_design_ksafe(self, k):
|
||||
LOG.debug("Setting vertica k-safety level to : %s." % k)
|
||||
version = guest_api.API.API_BASE_VERSION
|
||||
|
||||
return self._call("mark_design_ksafe", CONF.cluster_usage_timeout,
|
||||
self.version_cap, k=k)
|
||||
version=version, k=k)
|
||||
|
||||
def cluster_complete(self):
|
||||
LOG.debug("Notifying cluster install completion.")
|
||||
version = guest_api.API.API_BASE_VERSION
|
||||
|
||||
return self._call("cluster_complete", guest_api.AGENT_HIGH_TIMEOUT,
|
||||
self.version_cap)
|
||||
version=version)
|
||||
|
@ -16,28 +16,50 @@ from oslo_log import log as logging
|
||||
import oslo_messaging as messaging
|
||||
|
||||
from trove.common import cfg
|
||||
from trove.common.rpc import version as rpc_version
|
||||
from trove.common.serializable_notification import SerializableNotification
|
||||
from trove import rpc
|
||||
|
||||
|
||||
CONF = cfg.CONF
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class API(object):
|
||||
"""API for interacting with trove conductor."""
|
||||
"""API for interacting with trove conductor.
|
||||
|
||||
API version history:
|
||||
* 1.0 - Initial version.
|
||||
|
||||
When updating this API, also update API_LATEST_VERSION
|
||||
"""
|
||||
|
||||
# API_LATEST_VERSION should bump the minor number each time
|
||||
# a method signature is added or changed
|
||||
API_LATEST_VERSION = '1.0'
|
||||
|
||||
# API_BASE_VERSION should only change on major version upgrade
|
||||
API_BASE_VERSION = '1.0'
|
||||
|
||||
VERSION_ALIASES = {
|
||||
'icehouse': '1.0',
|
||||
'juno': '1.0',
|
||||
'kilo': '1.0',
|
||||
'liberty': '1.0',
|
||||
'mitaka': '1.0',
|
||||
'newton': '1.0',
|
||||
|
||||
'latest': API_LATEST_VERSION
|
||||
}
|
||||
|
||||
def __init__(self, context):
|
||||
self.context = context
|
||||
super(API, self).__init__()
|
||||
|
||||
version_cap = self.VERSION_ALIASES.get(
|
||||
CONF.upgrade_levels.conductor, CONF.upgrade_levels.conductor)
|
||||
target = messaging.Target(topic=CONF.conductor_queue,
|
||||
version=rpc_version.RPC_API_VERSION)
|
||||
version=version_cap)
|
||||
|
||||
self.version_cap = rpc_version.VERSION_ALIASES.get(
|
||||
CONF.upgrade_levels.conductor)
|
||||
self.client = self.get_client(target, self.version_cap)
|
||||
self.client = self.get_client(target, version_cap)
|
||||
|
||||
def get_client(self, target, version_cap, serializer=None):
|
||||
return rpc.get_client(target,
|
||||
@ -47,8 +69,9 @@ class API(object):
|
||||
def heartbeat(self, instance_id, payload, sent=None):
|
||||
LOG.debug("Making async call to cast heartbeat for instance: %s"
|
||||
% instance_id)
|
||||
version = self.API_BASE_VERSION
|
||||
|
||||
cctxt = self.client.prepare(version=self.version_cap)
|
||||
cctxt = self.client.prepare(version=version)
|
||||
cctxt.cast(self.context, "heartbeat",
|
||||
instance_id=instance_id,
|
||||
sent=sent,
|
||||
@ -58,8 +81,9 @@ class API(object):
|
||||
**backup_fields):
|
||||
LOG.debug("Making async call to cast update_backup for instance: %s"
|
||||
% instance_id)
|
||||
version = self.API_BASE_VERSION
|
||||
|
||||
cctxt = self.client.prepare(version=self.version_cap)
|
||||
cctxt = self.client.prepare(version=version)
|
||||
cctxt.cast(self.context, "update_backup",
|
||||
instance_id=instance_id,
|
||||
backup_id=backup_id,
|
||||
@ -69,14 +93,16 @@ class API(object):
|
||||
def report_root(self, instance_id, user):
|
||||
LOG.debug("Making async call to cast report_root for instance: %s"
|
||||
% instance_id)
|
||||
cctxt = self.client.prepare(version=self.version_cap)
|
||||
version = self.API_BASE_VERSION
|
||||
cctxt = self.client.prepare(version=version)
|
||||
cctxt.cast(self.context, "report_root",
|
||||
instance_id=instance_id,
|
||||
user=user)
|
||||
|
||||
def notify_end(self, **notification_args):
|
||||
LOG.debug("Making async call to cast end notification")
|
||||
cctxt = self.client.prepare(version=self.version_cap)
|
||||
version = self.API_BASE_VERSION
|
||||
cctxt = self.client.prepare(version=version)
|
||||
context = self.context
|
||||
serialized = SerializableNotification.serialize(context,
|
||||
context.notification)
|
||||
@ -86,7 +112,8 @@ class API(object):
|
||||
|
||||
def notify_exc_info(self, message, exception):
|
||||
LOG.debug("Making async call to cast error notification")
|
||||
cctxt = self.client.prepare(version=self.version_cap)
|
||||
version = self.API_BASE_VERSION
|
||||
cctxt = self.client.prepare(version=version)
|
||||
context = self.context
|
||||
serialized = SerializableNotification.serialize(context,
|
||||
context.notification)
|
||||
|
@ -26,7 +26,6 @@ from trove.common import cfg
|
||||
from trove.common import exception
|
||||
from trove.common.i18n import _
|
||||
from trove.common.notification import NotificationCastWrapper
|
||||
import trove.common.rpc.version as rpc_version
|
||||
from trove import rpc
|
||||
|
||||
CONF = cfg.CONF
|
||||
@ -37,19 +36,43 @@ AGENT_SNAPSHOT_TIMEOUT = CONF.agent_replication_snapshot_timeout
|
||||
|
||||
|
||||
class API(object):
|
||||
"""API for interacting with the guest manager."""
|
||||
"""API for interacting with the guest manager.
|
||||
|
||||
API version history:
|
||||
* 1.0 - Initial version.
|
||||
|
||||
When updating this API, also update API_LATEST_VERSION
|
||||
"""
|
||||
|
||||
# API_LATEST_VERSION should bump the minor number each time
|
||||
# a method signature is added or changed
|
||||
API_LATEST_VERSION = '1.0'
|
||||
|
||||
# API_BASE_VERSION should only change on major version upgrade
|
||||
API_BASE_VERSION = '1.0'
|
||||
|
||||
VERSION_ALIASES = {
|
||||
'icehouse': '1.0',
|
||||
'juno': '1.0',
|
||||
'kilo': '1.0',
|
||||
'liberty': '1.0',
|
||||
'mitaka': '1.0',
|
||||
'newton': '1.0',
|
||||
|
||||
'latest': API_LATEST_VERSION
|
||||
}
|
||||
|
||||
def __init__(self, context, id):
|
||||
self.context = context
|
||||
self.id = id
|
||||
super(API, self).__init__()
|
||||
|
||||
version_cap = self.VERSION_ALIASES.get(
|
||||
CONF.upgrade_levels.guestagent, CONF.upgrade_levels.guestagent)
|
||||
target = messaging.Target(topic=self._get_routing_key(),
|
||||
version=rpc_version.RPC_API_VERSION)
|
||||
version=version_cap)
|
||||
|
||||
self.version_cap = rpc_version.VERSION_ALIASES.get(
|
||||
CONF.upgrade_levels.guestagent)
|
||||
self.client = self.get_client(target, self.version_cap)
|
||||
self.client = self.get_client(target, version_cap)
|
||||
|
||||
def get_client(self, target, version_cap, serializer=None):
|
||||
return rpc.get_client(target,
|
||||
@ -95,31 +118,44 @@ class API(object):
|
||||
users.
|
||||
"""
|
||||
LOG.debug("Changing passwords for users on instance %s.", self.id)
|
||||
self._cast("change_passwords", self.version_cap, users=users)
|
||||
version = self.API_BASE_VERSION
|
||||
|
||||
self._cast("change_passwords", version=version, users=users)
|
||||
|
||||
def update_attributes(self, username, hostname, user_attrs):
|
||||
"""Update user attributes."""
|
||||
LOG.debug("Changing user attributes on instance %s.", self.id)
|
||||
self._cast("update_attributes", self.version_cap, username=username,
|
||||
version = self.API_BASE_VERSION
|
||||
|
||||
self._cast("update_attributes",
|
||||
version=version, username=username,
|
||||
hostname=hostname, user_attrs=user_attrs)
|
||||
|
||||
def create_user(self, users):
|
||||
"""Make an asynchronous call to create a new database user"""
|
||||
LOG.debug("Creating Users for instance %s.", self.id)
|
||||
self._cast("create_user", self.version_cap, users=users)
|
||||
version = self.API_BASE_VERSION
|
||||
|
||||
self._cast("create_user", version=version, users=users)
|
||||
|
||||
def get_user(self, username, hostname):
|
||||
"""Make an asynchronous call to get a single database user."""
|
||||
LOG.debug("Getting a user %(username)s on instance %(id)s.",
|
||||
{'username': username, 'id': self.id})
|
||||
return self._call("get_user", AGENT_LOW_TIMEOUT, self.version_cap,
|
||||
version = self.API_BASE_VERSION
|
||||
|
||||
return self._call("get_user",
|
||||
AGENT_LOW_TIMEOUT, version=version,
|
||||
username=username, hostname=hostname)
|
||||
|
||||
def list_access(self, username, hostname):
|
||||
"""Show all the databases to which a user has more than USAGE."""
|
||||
LOG.debug("Showing user %(username)s grants on instance %(id)s.",
|
||||
{'username': username, 'id': self.id})
|
||||
return self._call("list_access", AGENT_LOW_TIMEOUT, self.version_cap,
|
||||
version = self.API_BASE_VERSION
|
||||
|
||||
return self._call("list_access",
|
||||
AGENT_LOW_TIMEOUT, version=version,
|
||||
username=username, hostname=hostname)
|
||||
|
||||
def grant_access(self, username, hostname, databases):
|
||||
@ -128,7 +164,10 @@ class API(object):
|
||||
"%(username)s on instance %(id)s.", {'username': username,
|
||||
'databases': databases,
|
||||
'id': self.id})
|
||||
return self._call("grant_access", AGENT_LOW_TIMEOUT, self.version_cap,
|
||||
version = self.API_BASE_VERSION
|
||||
|
||||
return self._call("grant_access",
|
||||
AGENT_LOW_TIMEOUT, version=version,
|
||||
username=username, hostname=hostname,
|
||||
databases=databases)
|
||||
|
||||
@ -138,14 +177,20 @@ class API(object):
|
||||
"%(username)s on instance %(id)s.", {'username': username,
|
||||
'database': database,
|
||||
'id': self.id})
|
||||
return self._call("revoke_access", AGENT_LOW_TIMEOUT, self.version_cap,
|
||||
version = self.API_BASE_VERSION
|
||||
|
||||
return self._call("revoke_access",
|
||||
AGENT_LOW_TIMEOUT, version=version,
|
||||
username=username, hostname=hostname,
|
||||
database=database)
|
||||
|
||||
def list_users(self, limit=None, marker=None, include_marker=False):
|
||||
"""Make an asynchronous call to list database users."""
|
||||
LOG.debug("Listing Users for instance %s.", self.id)
|
||||
return self._call("list_users", AGENT_HIGH_TIMEOUT, self.version_cap,
|
||||
version = self.API_BASE_VERSION
|
||||
|
||||
return self._call("list_users", AGENT_HIGH_TIMEOUT,
|
||||
version=version,
|
||||
limit=limit, marker=marker,
|
||||
include_marker=include_marker)
|
||||
|
||||
@ -153,20 +198,27 @@ class API(object):
|
||||
"""Make an asynchronous call to delete an existing database user."""
|
||||
LOG.debug("Deleting user %(user)s for instance %(instance_id)s." %
|
||||
{'user': user, 'instance_id': self.id})
|
||||
self._cast("delete_user", self.version_cap, user=user)
|
||||
version = self.API_BASE_VERSION
|
||||
|
||||
self._cast("delete_user", version=version, user=user)
|
||||
|
||||
def create_database(self, databases):
|
||||
"""Make an asynchronous call to create a new database
|
||||
within the specified container
|
||||
"""
|
||||
LOG.debug("Creating databases for instance %s.", self.id)
|
||||
self._cast("create_database", self.version_cap, databases=databases)
|
||||
version = self.API_BASE_VERSION
|
||||
|
||||
self._cast("create_database", version=version,
|
||||
databases=databases)
|
||||
|
||||
def list_databases(self, limit=None, marker=None, include_marker=False):
|
||||
"""Make an asynchronous call to list databases."""
|
||||
LOG.debug("Listing databases for instance %s.", self.id)
|
||||
version = self.API_BASE_VERSION
|
||||
|
||||
return self._call("list_databases", AGENT_LOW_TIMEOUT,
|
||||
self.version_cap, limit=limit, marker=marker,
|
||||
version=version, limit=limit, marker=marker,
|
||||
include_marker=include_marker)
|
||||
|
||||
def delete_database(self, database):
|
||||
@ -176,53 +228,72 @@ class API(object):
|
||||
LOG.debug("Deleting database %(database)s for "
|
||||
"instance %(instance_id)s." % {'database': database,
|
||||
'instance_id': self.id})
|
||||
self._cast("delete_database", self.version_cap, database=database)
|
||||
version = self.API_BASE_VERSION
|
||||
|
||||
self._cast("delete_database", version=version, database=database)
|
||||
|
||||
def enable_root(self):
|
||||
"""Make a synchronous call to enable the root user for
|
||||
access from anywhere
|
||||
"""
|
||||
LOG.debug("Enable root user for instance %s.", self.id)
|
||||
return self._call("enable_root", AGENT_HIGH_TIMEOUT, self.version_cap)
|
||||
version = self.API_BASE_VERSION
|
||||
|
||||
return self._call("enable_root", AGENT_HIGH_TIMEOUT,
|
||||
version=version)
|
||||
|
||||
def enable_root_with_password(self, root_password=None):
|
||||
"""Make a synchronous call to enable the root user for
|
||||
access from anywhere
|
||||
"""
|
||||
LOG.debug("Enable root user for instance %s.", self.id)
|
||||
version = self.API_BASE_VERSION
|
||||
|
||||
return self._call("enable_root_with_password", AGENT_HIGH_TIMEOUT,
|
||||
self.version_cap, root_password=root_password)
|
||||
version=version, root_password=root_password)
|
||||
|
||||
def disable_root(self):
|
||||
"""Make a synchronous call to disable the root user for
|
||||
access from anywhere
|
||||
"""
|
||||
LOG.debug("Disable root user for instance %s.", self.id)
|
||||
return self._call("disable_root", AGENT_LOW_TIMEOUT, self.version_cap)
|
||||
version = self.API_BASE_VERSION
|
||||
|
||||
return self._call("disable_root", AGENT_LOW_TIMEOUT,
|
||||
version=version)
|
||||
|
||||
def is_root_enabled(self):
|
||||
"""Make a synchronous call to check if root access is
|
||||
available for the container
|
||||
"""
|
||||
LOG.debug("Check root access for instance %s.", self.id)
|
||||
version = self.API_BASE_VERSION
|
||||
|
||||
return self._call("is_root_enabled", AGENT_LOW_TIMEOUT,
|
||||
self.version_cap)
|
||||
version=version)
|
||||
|
||||
def get_hwinfo(self):
|
||||
"""Make a synchronous call to get hardware info for the container"""
|
||||
LOG.debug("Check hwinfo on instance %s.", self.id)
|
||||
return self._call("get_hwinfo", AGENT_LOW_TIMEOUT, self.version_cap)
|
||||
version = self.API_BASE_VERSION
|
||||
|
||||
return self._call("get_hwinfo", AGENT_LOW_TIMEOUT,
|
||||
version=version)
|
||||
|
||||
def get_diagnostics(self):
|
||||
"""Make a synchronous call to get diagnostics for the container"""
|
||||
LOG.debug("Check diagnostics on instance %s.", self.id)
|
||||
version = self.API_BASE_VERSION
|
||||
|
||||
return self._call("get_diagnostics", AGENT_LOW_TIMEOUT,
|
||||
self.version_cap)
|
||||
version=version)
|
||||
|
||||
def rpc_ping(self):
|
||||
"""Make a synchronous RPC call to check if we can ping the instance."""
|
||||
LOG.debug("Check RPC ping on instance %s.", self.id)
|
||||
return self._call("rpc_ping", AGENT_LOW_TIMEOUT, self.version_cap)
|
||||
version = self.API_BASE_VERSION
|
||||
|
||||
return self._call("rpc_ping", AGENT_LOW_TIMEOUT, version=version)
|
||||
|
||||
def prepare(self, memory_mb, packages, databases, users,
|
||||
device_path='/dev/vdb', mount_point='/mnt/volume',
|
||||
@ -234,6 +305,8 @@ class API(object):
|
||||
"""
|
||||
LOG.debug("Sending the call to prepare the Guest.")
|
||||
|
||||
version = self.API_BASE_VERSION
|
||||
|
||||
# Taskmanager is a publisher, guestagent is a consumer. Usually
|
||||
# consumer creates a queue, but in this case we have to make sure
|
||||
# "prepare" doesn't get lost if for some reason guest was delayed and
|
||||
@ -242,7 +315,7 @@ class API(object):
|
||||
|
||||
packages = packages.split()
|
||||
self._cast(
|
||||
"prepare", self.version_cap, packages=packages,
|
||||
"prepare", version=version, packages=packages,
|
||||
databases=databases, memory_mb=memory_mb, users=users,
|
||||
device_path=device_path, mount_point=mount_point,
|
||||
backup_info=backup_info, config_contents=config_contents,
|
||||
@ -258,7 +331,7 @@ class API(object):
|
||||
server = None
|
||||
target = messaging.Target(topic=self._get_routing_key(),
|
||||
server=self.id,
|
||||
version=rpc_version.RPC_API_VERSION)
|
||||
version=self.API_BASE_VERSION)
|
||||
try:
|
||||
server = rpc.get_server(target, [])
|
||||
server.start()
|
||||
@ -270,26 +343,35 @@ class API(object):
|
||||
def pre_upgrade(self):
|
||||
"""Prepare the guest for upgrade."""
|
||||
LOG.debug("Sending the call to prepare the guest for upgrade.")
|
||||
return self._call("pre_upgrade", AGENT_HIGH_TIMEOUT, self.version_cap)
|
||||
version = self.API_BASE_VERSION
|
||||
|
||||
return self._call("pre_upgrade", AGENT_HIGH_TIMEOUT,
|
||||
version=version)
|
||||
|
||||
def post_upgrade(self, upgrade_info):
|
||||
"""Recover the guest after upgrading the guest's image."""
|
||||
LOG.debug("Recover the guest after upgrading the guest's image.")
|
||||
self._call("post_upgrade", AGENT_HIGH_TIMEOUT, self.version_cap,
|
||||
version = self.API_BASE_VERSION
|
||||
|
||||
self._call("post_upgrade", AGENT_HIGH_TIMEOUT, version=version,
|
||||
upgrade_info=upgrade_info)
|
||||
|
||||
def restart(self):
|
||||
"""Restart the database server."""
|
||||
LOG.debug("Sending the call to restart the database process "
|
||||
"on the Guest.")
|
||||
self._call("restart", AGENT_HIGH_TIMEOUT, self.version_cap)
|
||||
version = self.API_BASE_VERSION
|
||||
|
||||
self._call("restart", AGENT_HIGH_TIMEOUT, version=version)
|
||||
|
||||
def start_db_with_conf_changes(self, config_contents):
|
||||
"""Start the database server."""
|
||||
LOG.debug("Sending the call to start the database process on "
|
||||
"the Guest with a timeout of %s." % AGENT_HIGH_TIMEOUT)
|
||||
version = self.API_BASE_VERSION
|
||||
|
||||
self._call("start_db_with_conf_changes", AGENT_HIGH_TIMEOUT,
|
||||
self.version_cap, config_contents=config_contents)
|
||||
version=version, config_contents=config_contents)
|
||||
|
||||
def reset_configuration(self, configuration):
|
||||
"""Ignore running state of the database server; just change
|
||||
@ -297,20 +379,26 @@ class API(object):
|
||||
"""
|
||||
LOG.debug("Sending the call to change the database conf file on the "
|
||||
"Guest with a timeout of %s." % AGENT_HIGH_TIMEOUT)
|
||||
version = self.API_BASE_VERSION
|
||||
|
||||
self._call("reset_configuration", AGENT_HIGH_TIMEOUT,
|
||||
self.version_cap, configuration=configuration)
|
||||
version=version, configuration=configuration)
|
||||
|
||||
def stop_db(self, do_not_start_on_reboot=False):
|
||||
"""Stop the database server."""
|
||||
LOG.debug("Sending the call to stop the database process "
|
||||
"on the Guest.")
|
||||
self._call("stop_db", AGENT_HIGH_TIMEOUT, self.version_cap,
|
||||
version = self.API_BASE_VERSION
|
||||
|
||||
self._call("stop_db", AGENT_HIGH_TIMEOUT, version=version,
|
||||
do_not_start_on_reboot=do_not_start_on_reboot)
|
||||
|
||||
def upgrade(self, instance_version, location, metadata=None):
|
||||
"""Make an asynchronous call to self upgrade the guest agent."""
|
||||
LOG.debug("Sending an upgrade call to nova-guest.")
|
||||
self._cast("upgrade", self.version_cap,
|
||||
version = self.API_BASE_VERSION
|
||||
|
||||
self._cast("upgrade", version=version,
|
||||
instance_version=instance_version,
|
||||
location=location,
|
||||
metadata=metadata)
|
||||
@ -318,158 +406,214 @@ class API(object):
|
||||
def get_volume_info(self):
|
||||
"""Make a synchronous call to get volume info for the container."""
|
||||
LOG.debug("Check Volume Info on instance %s.", self.id)
|
||||
version = self.API_BASE_VERSION
|
||||
|
||||
return self._call("get_filesystem_stats", AGENT_LOW_TIMEOUT,
|
||||
self.version_cap, fs_path=None)
|
||||
version=version, fs_path=None)
|
||||
|
||||
def update_guest(self):
|
||||
"""Make a synchronous call to update the guest agent."""
|
||||
LOG.debug("Updating guest agent on instance %s.", self.id)
|
||||
self._call("update_guest", AGENT_HIGH_TIMEOUT, self.version_cap)
|
||||
version = self.API_BASE_VERSION
|
||||
|
||||
self._call("update_guest", AGENT_HIGH_TIMEOUT, version=version)
|
||||
|
||||
def create_backup(self, backup_info):
|
||||
"""Make async call to create a full backup of this instance."""
|
||||
LOG.debug("Create Backup %(backup_id)s "
|
||||
"for instance %(instance_id)s." %
|
||||
{'backup_id': backup_info['id'], 'instance_id': self.id})
|
||||
self._cast("create_backup", self.version_cap, backup_info=backup_info)
|
||||
version = self.API_BASE_VERSION
|
||||
|
||||
self._cast("create_backup", version=version,
|
||||
backup_info=backup_info)
|
||||
|
||||
def mount_volume(self, device_path=None, mount_point=None):
|
||||
"""Mount the volume."""
|
||||
LOG.debug("Mount volume %(mount)s on instance %(id)s." % {
|
||||
'mount': mount_point, 'id': self.id})
|
||||
self._call("mount_volume", AGENT_LOW_TIMEOUT, self.version_cap,
|
||||
version = self.API_BASE_VERSION
|
||||
|
||||
self._call("mount_volume", AGENT_LOW_TIMEOUT, version=version,
|
||||
device_path=device_path, mount_point=mount_point)
|
||||
|
||||
def unmount_volume(self, device_path=None, mount_point=None):
|
||||
"""Unmount the volume."""
|
||||
LOG.debug("Unmount volume %(device)s on instance %(id)s." % {
|
||||
'device': device_path, 'id': self.id})
|
||||
self._call("unmount_volume", AGENT_LOW_TIMEOUT, self.version_cap,
|
||||
version = self.API_BASE_VERSION
|
||||
|
||||
self._call("unmount_volume", AGENT_LOW_TIMEOUT, version=version,
|
||||
device_path=device_path, mount_point=mount_point)
|
||||
|
||||
def resize_fs(self, device_path=None, mount_point=None):
|
||||
"""Resize the filesystem."""
|
||||
LOG.debug("Resize device %(device)s on instance %(id)s." % {
|
||||
'device': device_path, 'id': self.id})
|
||||
self._call("resize_fs", AGENT_HIGH_TIMEOUT, self.version_cap,
|
||||
version = self.API_BASE_VERSION
|
||||
|
||||
self._call("resize_fs", AGENT_HIGH_TIMEOUT, version=version,
|
||||
device_path=device_path, mount_point=mount_point)
|
||||
|
||||
def update_overrides(self, overrides, remove=False):
|
||||
"""Update the overrides."""
|
||||
LOG.debug("Updating overrides values %(overrides)s on instance "
|
||||
"%(id)s.", {'overrides': overrides, 'id': self.id})
|
||||
version = self.API_BASE_VERSION
|
||||
|
||||
self._call("update_overrides", AGENT_HIGH_TIMEOUT,
|
||||
self.version_cap, overrides=overrides, remove=remove)
|
||||
version=version, overrides=overrides, remove=remove)
|
||||
|
||||
def apply_overrides(self, overrides):
|
||||
LOG.debug("Applying overrides values %(overrides)s on instance "
|
||||
"%(id)s.", {'overrides': overrides, 'id': self.id})
|
||||
self._call("apply_overrides", AGENT_HIGH_TIMEOUT, self.version_cap,
|
||||
overrides=overrides)
|
||||
version = self.API_BASE_VERSION
|
||||
|
||||
self._call("apply_overrides", AGENT_HIGH_TIMEOUT,
|
||||
version=version, overrides=overrides)
|
||||
|
||||
def backup_required_for_replication(self):
|
||||
LOG.debug("Checking backup requirement for replication")
|
||||
version = self.API_BASE_VERSION
|
||||
|
||||
return self._call("backup_required_for_replication",
|
||||
AGENT_LOW_TIMEOUT,
|
||||
self.version_cap)
|
||||
version=version)
|
||||
|
||||
def get_replication_snapshot(self, snapshot_info=None,
|
||||
replica_source_config=None):
|
||||
LOG.debug("Retrieving replication snapshot from instance %s.", self.id)
|
||||
version = self.API_BASE_VERSION
|
||||
|
||||
return self._call("get_replication_snapshot", AGENT_SNAPSHOT_TIMEOUT,
|
||||
self.version_cap, snapshot_info=snapshot_info,
|
||||
version=version, snapshot_info=snapshot_info,
|
||||
replica_source_config=replica_source_config)
|
||||
|
||||
def attach_replication_slave(self, snapshot, replica_config=None):
|
||||
LOG.debug("Configuring instance %s to replicate from %s.",
|
||||
self.id, snapshot.get('master').get('id'))
|
||||
self._cast("attach_replication_slave", self.version_cap,
|
||||
version = self.API_BASE_VERSION
|
||||
|
||||
self._cast("attach_replication_slave", version=version,
|
||||
snapshot=snapshot, slave_config=replica_config)
|
||||
|
||||
def detach_replica(self, for_failover=False):
|
||||
LOG.debug("Detaching replica %s from its replication source.", self.id)
|
||||
version = self.API_BASE_VERSION
|
||||
|
||||
return self._call("detach_replica", AGENT_HIGH_TIMEOUT,
|
||||
self.version_cap, for_failover=for_failover)
|
||||
version=version, for_failover=for_failover)
|
||||
|
||||
def get_replica_context(self):
|
||||
LOG.debug("Getting replica context.")
|
||||
version = self.API_BASE_VERSION
|
||||
|
||||
return self._call("get_replica_context",
|
||||
AGENT_HIGH_TIMEOUT, self.version_cap)
|
||||
AGENT_HIGH_TIMEOUT, version=version)
|
||||
|
||||
def attach_replica(self, replica_info, slave_config):
|
||||
LOG.debug("Attaching replica %s." % replica_info)
|
||||
self._call("attach_replica", AGENT_HIGH_TIMEOUT, self.version_cap,
|
||||
version = self.API_BASE_VERSION
|
||||
|
||||
self._call("attach_replica", AGENT_HIGH_TIMEOUT, version=version,
|
||||
replica_info=replica_info, slave_config=slave_config)
|
||||
|
||||
def make_read_only(self, read_only):
|
||||
LOG.debug("Executing make_read_only(%s)" % read_only)
|
||||
self._call("make_read_only", AGENT_HIGH_TIMEOUT, self.version_cap,
|
||||
version = self.API_BASE_VERSION
|
||||
|
||||
self._call("make_read_only", AGENT_HIGH_TIMEOUT, version=version,
|
||||
read_only=read_only)
|
||||
|
||||
def enable_as_master(self, replica_source_config):
|
||||
LOG.debug("Executing enable_as_master")
|
||||
self._call("enable_as_master", AGENT_HIGH_TIMEOUT, self.version_cap,
|
||||
version = self.API_BASE_VERSION
|
||||
|
||||
self._call("enable_as_master", AGENT_HIGH_TIMEOUT,
|
||||
version=version,
|
||||
replica_source_config=replica_source_config)
|
||||
|
||||
# DEPRECATED: Maintain for API Compatibility
|
||||
def get_txn_count(self):
|
||||
LOG.debug("Executing get_txn_count.")
|
||||
version = self.API_BASE_VERSION
|
||||
|
||||
return self._call("get_txn_count",
|
||||
AGENT_HIGH_TIMEOUT, self.version_cap)
|
||||
AGENT_HIGH_TIMEOUT, version=version)
|
||||
|
||||
def get_last_txn(self):
|
||||
LOG.debug("Executing get_last_txn.")
|
||||
version = self.API_BASE_VERSION
|
||||
|
||||
return self._call("get_last_txn",
|
||||
AGENT_HIGH_TIMEOUT, self.version_cap)
|
||||
AGENT_HIGH_TIMEOUT, version=version)
|
||||
|
||||
def get_latest_txn_id(self):
|
||||
LOG.debug("Executing get_latest_txn_id.")
|
||||
version = self.API_BASE_VERSION
|
||||
|
||||
return self._call("get_latest_txn_id",
|
||||
AGENT_HIGH_TIMEOUT, self.version_cap)
|
||||
AGENT_HIGH_TIMEOUT, version=version)
|
||||
|
||||
def wait_for_txn(self, txn):
|
||||
LOG.debug("Executing wait_for_txn.")
|
||||
self._call("wait_for_txn", AGENT_HIGH_TIMEOUT, self.version_cap,
|
||||
version = self.API_BASE_VERSION
|
||||
|
||||
self._call("wait_for_txn", AGENT_HIGH_TIMEOUT, version=version,
|
||||
txn=txn)
|
||||
|
||||
def cleanup_source_on_replica_detach(self, replica_info):
|
||||
LOG.debug("Cleaning up master %s on detach of replica.", self.id)
|
||||
version = self.API_BASE_VERSION
|
||||
|
||||
self._call("cleanup_source_on_replica_detach", AGENT_HIGH_TIMEOUT,
|
||||
self.version_cap, replica_info=replica_info)
|
||||
version=version, replica_info=replica_info)
|
||||
|
||||
def demote_replication_master(self):
|
||||
LOG.debug("Demoting instance %s to non-master.", self.id)
|
||||
version = self.API_BASE_VERSION
|
||||
|
||||
self._call("demote_replication_master", AGENT_HIGH_TIMEOUT,
|
||||
self.version_cap)
|
||||
version=version)
|
||||
|
||||
def guest_log_list(self):
|
||||
LOG.debug("Retrieving guest log list for %s.", self.id)
|
||||
version = self.API_BASE_VERSION
|
||||
|
||||
result = self._call("guest_log_list", AGENT_HIGH_TIMEOUT,
|
||||
self.version_cap)
|
||||
version=version)
|
||||
LOG.debug("guest_log_list returns %s", result)
|
||||
return result
|
||||
|
||||
def guest_log_action(self, log_name, enable, disable, publish, discard):
|
||||
LOG.debug("Processing guest log '%s' for %s.", log_name, self.id)
|
||||
version = self.API_BASE_VERSION
|
||||
|
||||
return self._call("guest_log_action", AGENT_HIGH_TIMEOUT,
|
||||
self.version_cap, log_name=log_name,
|
||||
version=version, log_name=log_name,
|
||||
enable=enable, disable=disable,
|
||||
publish=publish, discard=discard)
|
||||
|
||||
def module_list(self, include_contents):
|
||||
LOG.debug("Querying modules on %s (contents: %s).",
|
||||
self.id, include_contents)
|
||||
version = self.API_BASE_VERSION
|
||||
|
||||
result = self._call("module_list", AGENT_HIGH_TIMEOUT,
|
||||
self.version_cap,
|
||||
version=version,
|
||||
include_contents=include_contents)
|
||||
return result
|
||||
|
||||
def module_apply(self, modules):
|
||||
LOG.debug("Applying modules to %s.", self.id)
|
||||
version = self.API_BASE_VERSION
|
||||
|
||||
return self._call("module_apply", AGENT_HIGH_TIMEOUT,
|
||||
self.version_cap, modules=modules)
|
||||
version=version, modules=modules)
|
||||
|
||||
def module_remove(self, module):
|
||||
LOG.debug("Removing modules from %s.", self.id)
|
||||
version = self.API_BASE_VERSION
|
||||
|
||||
return self._call("module_remove", AGENT_HIGH_TIMEOUT,
|
||||
self.version_cap, module=module)
|
||||
version=version, module=module)
|
||||
|
@ -24,7 +24,6 @@ import oslo_messaging as messaging
|
||||
from trove.common import cfg
|
||||
from trove.common import exception
|
||||
from trove.common.notification import NotificationCastWrapper
|
||||
import trove.common.rpc.version as rpc_version
|
||||
from trove.common.strategies.cluster import strategy
|
||||
from trove.guestagent import models as agent_models
|
||||
from trove import rpc
|
||||
@ -34,18 +33,42 @@ LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class API(object):
|
||||
"""API for interacting with the task manager."""
|
||||
"""API for interacting with the task manager.
|
||||
|
||||
API version history:
|
||||
* 1.0 - Initial version.
|
||||
|
||||
When updating this API, also update API_LATEST_VERSION
|
||||
"""
|
||||
|
||||
# API_LATEST_VERSION should bump the minor number each time
|
||||
# a method signature is added or changed
|
||||
API_LATEST_VERSION = '1.0'
|
||||
|
||||
# API_BASE_VERSION should only change on major version upgrade
|
||||
API_BASE_VERSION = '1.0'
|
||||
|
||||
VERSION_ALIASES = {
|
||||
'icehouse': '1.0',
|
||||
'juno': '1.0',
|
||||
'kilo': '1.0',
|
||||
'liberty': '1.0',
|
||||
'mitaka': '1.0',
|
||||
'newton': '1.0',
|
||||
|
||||
'latest': API_LATEST_VERSION
|
||||
}
|
||||
|
||||
def __init__(self, context):
|
||||
self.context = context
|
||||
super(API, self).__init__()
|
||||
|
||||
version_cap = self.VERSION_ALIASES.get(
|
||||
CONF.upgrade_levels.taskmanager, CONF.upgrade_levels.taskmanager)
|
||||
target = messaging.Target(topic=CONF.taskmanager_queue,
|
||||
version=rpc_version.RPC_API_VERSION)
|
||||
version=version_cap)
|
||||
|
||||
self.version_cap = rpc_version.VERSION_ALIASES.get(
|
||||
CONF.upgrade_levels.taskmanager)
|
||||
self.client = self.get_client(target, self.version_cap)
|
||||
self.client = self.get_client(target, version_cap)
|
||||
|
||||
def _cast(self, method_name, version, **kwargs):
|
||||
LOG.debug("Casting %s" % method_name)
|
||||
@ -79,72 +102,83 @@ class API(object):
|
||||
def resize_volume(self, new_size, instance_id):
|
||||
LOG.debug("Making async call to resize volume for instance: %s"
|
||||
% instance_id)
|
||||
version = self.API_BASE_VERSION
|
||||
|
||||
self._cast("resize_volume", self.version_cap,
|
||||
self._cast("resize_volume", version=version,
|
||||
new_size=new_size,
|
||||
instance_id=instance_id)
|
||||
|
||||
def resize_flavor(self, instance_id, old_flavor, new_flavor):
|
||||
LOG.debug("Making async call to resize flavor for instance: %s" %
|
||||
instance_id)
|
||||
version = self.API_BASE_VERSION
|
||||
|
||||
self._cast("resize_flavor", self.version_cap,
|
||||
self._cast("resize_flavor", version=version,
|
||||
instance_id=instance_id,
|
||||
old_flavor=self._transform_obj(old_flavor),
|
||||
new_flavor=self._transform_obj(new_flavor))
|
||||
|
||||
def reboot(self, instance_id):
|
||||
LOG.debug("Making async call to reboot instance: %s" % instance_id)
|
||||
version = self.API_BASE_VERSION
|
||||
|
||||
self._cast("reboot", self.version_cap, instance_id=instance_id)
|
||||
self._cast("reboot", version=version, instance_id=instance_id)
|
||||
|
||||
def restart(self, instance_id):
|
||||
LOG.debug("Making async call to restart instance: %s" % instance_id)
|
||||
version = self.API_BASE_VERSION
|
||||
|
||||
self._cast("restart", self.version_cap, instance_id=instance_id)
|
||||
self._cast("restart", version=version, instance_id=instance_id)
|
||||
|
||||
def detach_replica(self, instance_id):
|
||||
LOG.debug("Making async call to detach replica: %s" % instance_id)
|
||||
version = self.API_BASE_VERSION
|
||||
|
||||
self._cast("detach_replica", self.version_cap,
|
||||
self._cast("detach_replica", version=version,
|
||||
instance_id=instance_id)
|
||||
|
||||
def promote_to_replica_source(self, instance_id):
|
||||
LOG.debug("Making async call to promote replica to source: %s" %
|
||||
instance_id)
|
||||
self._cast("promote_to_replica_source", self.version_cap,
|
||||
version = self.API_BASE_VERSION
|
||||
self._cast("promote_to_replica_source", version=version,
|
||||
instance_id=instance_id)
|
||||
|
||||
def eject_replica_source(self, instance_id):
|
||||
LOG.debug("Making async call to eject replica source: %s" %
|
||||
instance_id)
|
||||
self._cast("eject_replica_source", self.version_cap,
|
||||
version = self.API_BASE_VERSION
|
||||
self._cast("eject_replica_source", version=version,
|
||||
instance_id=instance_id)
|
||||
|
||||
def migrate(self, instance_id, host):
|
||||
LOG.debug("Making async call to migrate instance: %s" % instance_id)
|
||||
version = self.API_BASE_VERSION
|
||||
|
||||
self._cast("migrate", self.version_cap,
|
||||
self._cast("migrate", version=version,
|
||||
instance_id=instance_id, host=host)
|
||||
|
||||
def delete_instance(self, instance_id):
|
||||
LOG.debug("Making async call to delete instance: %s" % instance_id)
|
||||
version = self.API_BASE_VERSION
|
||||
|
||||
self._cast("delete_instance", self.version_cap,
|
||||
self._cast("delete_instance", version=version,
|
||||
instance_id=instance_id)
|
||||
|
||||
def create_backup(self, backup_info, instance_id):
|
||||
LOG.debug("Making async call to create a backup for instance: %s" %
|
||||
instance_id)
|
||||
version = self.API_BASE_VERSION
|
||||
|
||||
self._cast("create_backup", self.version_cap,
|
||||
self._cast("create_backup", version=version,
|
||||
backup_info=backup_info,
|
||||
instance_id=instance_id)
|
||||
|
||||
def delete_backup(self, backup_id):
|
||||
LOG.debug("Making async call to delete backup: %s" % backup_id)
|
||||
version = self.API_BASE_VERSION
|
||||
|
||||
self._cast("delete_backup", self.version_cap, backup_id=backup_id)
|
||||
self._cast("delete_backup", version=version, backup_id=backup_id)
|
||||
|
||||
def create_instance(self, instance_id, name, flavor,
|
||||
image_id, databases, users, datastore_manager,
|
||||
@ -155,7 +189,8 @@ class API(object):
|
||||
modules=None, locality=None):
|
||||
|
||||
LOG.debug("Making async call to create instance %s " % instance_id)
|
||||
self._cast("create_instance", self.version_cap,
|
||||
version = self.API_BASE_VERSION
|
||||
self._cast("create_instance", version=version,
|
||||
instance_id=instance_id, name=name,
|
||||
flavor=self._transform_obj(flavor),
|
||||
image_id=image_id,
|
||||
@ -176,33 +211,38 @@ class API(object):
|
||||
|
||||
def create_cluster(self, cluster_id):
|
||||
LOG.debug("Making async call to create cluster %s " % cluster_id)
|
||||
version = self.API_BASE_VERSION
|
||||
|
||||
self._cast("create_cluster", self.version_cap, cluster_id=cluster_id)
|
||||
self._cast("create_cluster", version=version, cluster_id=cluster_id)
|
||||
|
||||
def grow_cluster(self, cluster_id, new_instance_ids):
|
||||
LOG.debug("Making async call to grow cluster %s " % cluster_id)
|
||||
version = self.API_BASE_VERSION
|
||||
|
||||
cctxt = self.client.prepare(version=self.version_cap)
|
||||
cctxt = self.client.prepare(version=version)
|
||||
cctxt.cast(self.context, "grow_cluster",
|
||||
cluster_id=cluster_id, new_instance_ids=new_instance_ids)
|
||||
|
||||
def shrink_cluster(self, cluster_id, instance_ids):
|
||||
LOG.debug("Making async call to shrink cluster %s " % cluster_id)
|
||||
version = self.API_BASE_VERSION
|
||||
|
||||
cctxt = self.client.prepare(version=self.version_cap)
|
||||
cctxt = self.client.prepare(version=version)
|
||||
cctxt.cast(self.context, "shrink_cluster",
|
||||
cluster_id=cluster_id, instance_ids=instance_ids)
|
||||
|
||||
def delete_cluster(self, cluster_id):
|
||||
LOG.debug("Making async call to delete cluster %s " % cluster_id)
|
||||
version = self.API_BASE_VERSION
|
||||
|
||||
self._cast("delete_cluster", self.version_cap, cluster_id=cluster_id)
|
||||
self._cast("delete_cluster", version=version, cluster_id=cluster_id)
|
||||
|
||||
def upgrade(self, instance_id, datastore_version_id):
|
||||
LOG.debug("Making async call to upgrade guest to datastore "
|
||||
"version %s " % datastore_version_id)
|
||||
version = self.API_BASE_VERSION
|
||||
|
||||
cctxt = self.client.prepare(version=self.version_cap)
|
||||
cctxt = self.client.prepare(version=version)
|
||||
cctxt.cast(self.context, "upgrade", instance_id=instance_id,
|
||||
datastore_version_id=datastore_version_id)
|
||||
|
||||
|
@ -14,7 +14,6 @@
|
||||
# under the License.
|
||||
|
||||
from oslo_log import log as logging
|
||||
import oslo_messaging as messaging
|
||||
from oslo_service import periodic_task
|
||||
from oslo_utils import importutils
|
||||
|
||||
@ -27,7 +26,6 @@ from trove.common.exception import TroveError
|
||||
from trove.common.i18n import _
|
||||
from trove.common.notification import DBaaSQuotas, EndNotification
|
||||
from trove.common import remote
|
||||
import trove.common.rpc.version as rpc_version
|
||||
from trove.common import server_group as srv_grp
|
||||
from trove.common.strategies.cluster import strategy
|
||||
from trove.datastore.models import DatastoreVersion
|
||||
@ -43,8 +41,6 @@ CONF = cfg.CONF
|
||||
|
||||
class Manager(periodic_task.PeriodicTasks):
|
||||
|
||||
target = messaging.Target(version=rpc_version.RPC_API_VERSION)
|
||||
|
||||
def __init__(self):
|
||||
super(Manager, self).__init__(CONF)
|
||||
self.admin_context = TroveContext(
|
||||
|
Loading…
x
Reference in New Issue
Block a user