diff --git a/trove/cmd/conductor.py b/trove/cmd/conductor.py index 66499190fb..daff5df4e5 100644 --- a/trove/cmd/conductor.py +++ b/trove/cmd/conductor.py @@ -16,13 +16,13 @@ from oslo_concurrency import processutils from oslo_service import service as openstack_service from trove.cmd.common import with_initialize +from trove.conductor import api as conductor_api @with_initialize def main(conf): from trove.common import notification from trove.common.rpc import service as rpc_service - from trove.common.rpc import version as rpc_version from trove.instance import models as inst_models notification.DBaaSAPINotification.register_notify_callback( @@ -30,7 +30,7 @@ def main(conf): topic = conf.conductor_queue server = rpc_service.RpcService( manager=conf.conductor_manager, topic=topic, - rpc_api_version=rpc_version.RPC_API_VERSION) + rpc_api_version=conductor_api.API.API_LATEST_VERSION) workers = conf.trove_conductor_workers or processutils.get_worker_count() launcher = openstack_service.launch(conf, server, workers=workers) launcher.wait() diff --git a/trove/cmd/guest.py b/trove/cmd/guest.py index 9f2b1807d0..ccb33563c5 100644 --- a/trove/cmd/guest.py +++ b/trove/cmd/guest.py @@ -25,6 +25,7 @@ from oslo_service import service as openstack_service from trove.common import cfg from trove.common import debug_utils from trove.common.i18n import _LE +from trove.guestagent import api as guest_api CONF = cfg.CONF # The guest_id opt definition must match the one in common/cfg.py @@ -57,11 +58,10 @@ def main(): rpc.init(CONF) from trove.common.rpc import service as rpc_service - from trove.common.rpc import version as rpc_version server = rpc_service.RpcService( topic="guestagent.%s" % CONF.guest_id, manager=manager, host=CONF.guest_id, - rpc_api_version=rpc_version.RPC_API_VERSION) + rpc_api_version=guest_api.API.API_LATEST_VERSION) launcher = openstack_service.launch(CONF, server) launcher.wait() diff --git a/trove/cmd/taskmanager.py b/trove/cmd/taskmanager.py index 58f7ed125a..aaef017c66 100644 --- a/trove/cmd/taskmanager.py +++ b/trove/cmd/taskmanager.py @@ -16,6 +16,7 @@ from oslo_config import cfg as openstack_cfg from oslo_service import service as openstack_service from trove.cmd.common import with_initialize +from trove.taskmanager import api as task_api extra_opts = [openstack_cfg.StrOpt('taskmanager_manager')] @@ -24,14 +25,13 @@ extra_opts = [openstack_cfg.StrOpt('taskmanager_manager')] def startup(conf, topic): from trove.common import notification from trove.common.rpc import service as rpc_service - from trove.common.rpc import version as rpc_version from trove.instance import models as inst_models notification.DBaaSAPINotification.register_notify_callback( inst_models.persist_instance_fault) server = rpc_service.RpcService( manager=conf.taskmanager_manager, topic=topic, - rpc_api_version=rpc_version.RPC_API_VERSION) + rpc_api_version=task_api.API.API_LATEST_VERSION) launcher = openstack_service.launch(conf, server) launcher.wait() diff --git a/trove/common/cfg.py b/trove/common/cfg.py index 908d0a59ad..c34968583a 100644 --- a/trove/common/cfg.py +++ b/trove/common/cfg.py @@ -1449,18 +1449,19 @@ mariadb_opts = [ upgrade_levels = cfg.OptGroup( 'upgrade_levels', title='RPC upgrade levels group for handling versions', - help='Contains the support version caps for each RPC API') + help='Contains the support version caps (Openstack Release) for ' + 'each RPC API') rpcapi_cap_opts = [ cfg.StrOpt( - 'taskmanager', default="icehouse", + 'taskmanager', default='latest', help='Set a version cap for messages sent to taskmanager services'), cfg.StrOpt( - 'guestagent', default="icehouse", + 'guestagent', default='latest', help='Set a version cap for messages sent to guestagent services'), cfg.StrOpt( - 'conductor', default="icehouse", - help='Set a version cap for messages sent to conductor services'), + 'conductor', default='latest', + help='Set Openstack Release compatibility for conductor services'), ] CONF = cfg.CONF diff --git a/trove/common/strategies/cluster/experimental/cassandra/guestagent.py b/trove/common/strategies/cluster/experimental/cassandra/guestagent.py index 6bdd8bc261..c8b89482f4 100644 --- a/trove/common/strategies/cluster/experimental/cassandra/guestagent.py +++ b/trove/common/strategies/cluster/experimental/cassandra/guestagent.py @@ -30,65 +30,100 @@ class CassandraGuestAgentStrategy(base.BaseGuestAgentStrategy): class CassandraGuestAgentAPI(guest_api.API): + """Cluster Specific Datastore Guest API + + **** VERSION CONTROLLED API **** + + The methods in this class are subject to version control as + coordinated by guestagent/api.py. Whenever a change is made to + any API method in this class, add a version number and comment + to the top of guestagent/api.py and use the version number as + appropriate in this file + """ def get_data_center(self): LOG.debug("Retrieving the data center for node: %s" % self.id) + version = guest_api.API.API_BASE_VERSION + return self._call("get_data_center", guest_api.AGENT_LOW_TIMEOUT, - self.version_cap) + version=version) def get_rack(self): LOG.debug("Retrieving the rack for node: %s" % self.id) + version = guest_api.API.API_BASE_VERSION + return self._call("get_rack", guest_api.AGENT_LOW_TIMEOUT, - self.version_cap) + version=version) def set_seeds(self, seeds): LOG.debug("Configuring the gossip seeds for node: %s" % self.id) + version = guest_api.API.API_BASE_VERSION + return self._call("set_seeds", guest_api.AGENT_LOW_TIMEOUT, - self.version_cap, seeds=seeds) + version=version, seeds=seeds) def get_seeds(self): LOG.debug("Retrieving the gossip seeds for node: %s" % self.id) + version = guest_api.API.API_BASE_VERSION + return self._call("get_seeds", guest_api.AGENT_LOW_TIMEOUT, - self.version_cap) + version=version) def set_auto_bootstrap(self, enabled): LOG.debug("Setting the auto-bootstrap to '%s' for node: %s" % (enabled, self.id)) + version = guest_api.API.API_BASE_VERSION + return self._call("set_auto_bootstrap", guest_api.AGENT_LOW_TIMEOUT, - self.version_cap, enabled=enabled) + version=version, enabled=enabled) def cluster_complete(self): LOG.debug("Sending a setup completion notification for node: %s" % self.id) + version = guest_api.API.API_BASE_VERSION + return self._call("cluster_complete", guest_api.AGENT_HIGH_TIMEOUT, - self.version_cap) + version=version) def node_cleanup_begin(self): LOG.debug("Signaling the node to prepare for cleanup: %s" % self.id) + version = guest_api.API.API_BASE_VERSION + return self._call("node_cleanup_begin", guest_api.AGENT_LOW_TIMEOUT, - self.version_cap) + version=version) def node_cleanup(self): LOG.debug("Running cleanup on node: %s" % self.id) - return self._cast('node_cleanup', self.version_cap) + version = guest_api.API.API_BASE_VERSION + + return self._cast('node_cleanup', version=version) def node_decommission(self): LOG.debug("Decommission node: %s" % self.id) - return self._cast("node_decommission", self.version_cap) + version = guest_api.API.API_BASE_VERSION + + return self._cast("node_decommission", version=version) def cluster_secure(self, password): LOG.debug("Securing the cluster via node: %s" % self.id) + version = guest_api.API.API_BASE_VERSION + return self._call( "cluster_secure", guest_api.AGENT_HIGH_TIMEOUT, - self.version_cap, password=password) + version=version, password=password) def get_admin_credentials(self): LOG.debug("Retrieving the admin credentials from node: %s" % self.id) + version = guest_api.API.API_BASE_VERSION + return self._call("get_admin_credentials", guest_api.AGENT_LOW_TIMEOUT, - self.version_cap) + version=version) def store_admin_credentials(self, admin_credentials): LOG.debug("Storing the admin credentials on node: %s" % self.id) + version = guest_api.API.API_BASE_VERSION + return self._call("store_admin_credentials", - guest_api.AGENT_LOW_TIMEOUT, self.version_cap, + guest_api.AGENT_LOW_TIMEOUT, + version=version, admin_credentials=admin_credentials) diff --git a/trove/common/strategies/cluster/experimental/galera_common/guestagent.py b/trove/common/strategies/cluster/experimental/galera_common/guestagent.py index 7510e29a8b..1161c38d36 100644 --- a/trove/common/strategies/cluster/experimental/galera_common/guestagent.py +++ b/trove/common/strategies/cluster/experimental/galera_common/guestagent.py @@ -31,39 +31,59 @@ class GaleraCommonGuestAgentStrategy(cluster_base.BaseGuestAgentStrategy): class GaleraCommonGuestAgentAPI(guest_api.API): + """Cluster Specific Datastore Guest API + + **** VERSION CONTROLLED API **** + + The methods in this class are subject to version control as + coordinated by guestagent/api.py. Whenever a change is made to + any API method in this class, add a version number and comment + to the top of guestagent/api.py and use the version number as + appropriate in this file + """ def install_cluster(self, replication_user, cluster_configuration, bootstrap): """Install the cluster.""" LOG.debug("Installing Galera cluster.") + version = guest_api.API.API_BASE_VERSION + self._call("install_cluster", CONF.cluster_usage_timeout, - self.version_cap, + version=version, replication_user=replication_user, cluster_configuration=cluster_configuration, bootstrap=bootstrap) def reset_admin_password(self, admin_password): """Store this password on the instance as the admin password.""" + version = guest_api.API.API_BASE_VERSION + self._call("reset_admin_password", CONF.cluster_usage_timeout, - self.version_cap, + version=version, admin_password=admin_password) def cluster_complete(self): """Set the status that the cluster is build is complete.""" LOG.debug("Notifying cluster install completion.") + version = guest_api.API.API_BASE_VERSION + return self._call("cluster_complete", guest_api.AGENT_HIGH_TIMEOUT, - self.version_cap) + version=version) def get_cluster_context(self): """Get the context of the cluster.""" LOG.debug("Getting the cluster context.") + version = guest_api.API.API_BASE_VERSION + return self._call("get_cluster_context", guest_api.AGENT_HIGH_TIMEOUT, - self.version_cap) + version=version) def write_cluster_configuration_overrides(self, cluster_configuration): """Write an updated the cluster configuration.""" LOG.debug("Writing an updated the cluster configuration.") + version = guest_api.API.API_BASE_VERSION + self._call("write_cluster_configuration_overrides", guest_api.AGENT_HIGH_TIMEOUT, - self.version_cap, + version=version, cluster_configuration=cluster_configuration) diff --git a/trove/common/strategies/cluster/experimental/mongodb/guestagent.py b/trove/common/strategies/cluster/experimental/mongodb/guestagent.py index 1566f34fa5..08e1a8a91e 100644 --- a/trove/common/strategies/cluster/experimental/mongodb/guestagent.py +++ b/trove/common/strategies/cluster/experimental/mongodb/guestagent.py @@ -33,6 +33,16 @@ class MongoDbGuestAgentStrategy(base.BaseGuestAgentStrategy): class MongoDbGuestAgentAPI(guest_api.API): + """Cluster Specific Datastore Guest API + + **** VERSION CONTROLLED API **** + + The methods in this class are subject to version control as + coordinated by guestagent/api.py. Whenever a change is made to + any API method in this class, add a version number and comment + to the top of guestagent/api.py and use the version number as + appropriate in this file + """ def add_shard(self, replica_set_name, replica_set_member): LOG.debug("Adding shard with replSet %(replica_set_name)s and member " @@ -40,66 +50,89 @@ class MongoDbGuestAgentAPI(guest_api.API): "%(id)s" % {'replica_set_name': replica_set_name, 'replica_set_member': replica_set_member, 'id': self.id}) + version = guest_api.API.API_BASE_VERSION + return self._call("add_shard", guest_api.AGENT_HIGH_TIMEOUT, - self.version_cap, + version=version, replica_set_name=replica_set_name, replica_set_member=replica_set_member) def add_members(self, members): LOG.debug("Adding members %(members)s on instance %(id)s" % { 'members': members, 'id': self.id}) + version = guest_api.API.API_BASE_VERSION + return self._call("add_members", ADD_MEMBERS_TIMEOUT, - self.version_cap, members=members) + version=version, members=members) def add_config_servers(self, config_servers): LOG.debug("Adding config servers %(config_servers)s for instance " "%(id)s" % {'config_servers': config_servers, 'id': self.id}) + version = guest_api.API.API_BASE_VERSION + return self._call("add_config_servers", guest_api.AGENT_HIGH_TIMEOUT, - self.version_cap, config_servers=config_servers) + version=version, + config_servers=config_servers) def cluster_complete(self): LOG.debug("Notify regarding cluster install completion") + version = guest_api.API.API_BASE_VERSION + return self._call("cluster_complete", guest_api.AGENT_HIGH_TIMEOUT, - self.version_cap) + version=version) def get_key(self): LOG.debug("Requesting cluster key from guest") + version = guest_api.API.API_BASE_VERSION + return self._call("get_key", guest_api.AGENT_LOW_TIMEOUT, - self.version_cap) + version=version) def prep_primary(self): LOG.debug("Preparing member to be primary member.") + version = guest_api.API.API_BASE_VERSION + return self._call("prep_primary", guest_api.AGENT_HIGH_TIMEOUT, - self.version_cap) + version=version) def create_admin_user(self, password): LOG.debug("Creating admin user") + version = guest_api.API.API_BASE_VERSION + return self._call("create_admin_user", guest_api.AGENT_HIGH_TIMEOUT, - self.version_cap, password=password) + version=version, password=password) def store_admin_password(self, password): LOG.debug("Storing admin password") + version = guest_api.API.API_BASE_VERSION + return self._call("store_admin_password", guest_api.AGENT_LOW_TIMEOUT, - self.version_cap, + version=version, password=password) def get_replica_set_name(self): LOG.debug("Querying member for its replica set name") + version = guest_api.API.API_BASE_VERSION + return self._call("get_replica_set_name", guest_api.AGENT_HIGH_TIMEOUT, - self.version_cap) + version=version) def get_admin_password(self): LOG.debug("Querying instance for its admin password") + version = guest_api.API.API_BASE_VERSION + return self._call("get_admin_password", guest_api.AGENT_LOW_TIMEOUT, - self.version_cap) + version=version) def is_shard_active(self, replica_set_name): LOG.debug("Checking if replica set %s is active" % replica_set_name) + version = guest_api.API.API_BASE_VERSION + return self._call("is_shard_active", guest_api.AGENT_HIGH_TIMEOUT, - self.version_cap, + version=version, replica_set_name=replica_set_name) diff --git a/trove/common/strategies/cluster/experimental/mongodb/taskmanager.py b/trove/common/strategies/cluster/experimental/mongodb/taskmanager.py index 66273d7c2c..c4a3c1f918 100644 --- a/trove/common/strategies/cluster/experimental/mongodb/taskmanager.py +++ b/trove/common/strategies/cluster/experimental/mongodb/taskmanager.py @@ -379,7 +379,8 @@ class MongoDbTaskManagerAPI(task_api.API): def mongodb_add_shard_cluster(self, cluster_id, shard_id, replica_set_name): LOG.debug("Making async call to add shard cluster %s " % cluster_id) - cctxt = self.client.prepare(version=self.version_cap) + version = task_api.API.API_BASE_VERSION + cctxt = self.client.prepare(version=version) cctxt.cast(self.context, "add_shard_cluster", cluster_id=cluster_id, diff --git a/trove/common/strategies/cluster/experimental/redis/guestagent.py b/trove/common/strategies/cluster/experimental/redis/guestagent.py index d8ec3f3b41..1f8f2c4d39 100644 --- a/trove/common/strategies/cluster/experimental/redis/guestagent.py +++ b/trove/common/strategies/cluster/experimental/redis/guestagent.py @@ -28,34 +28,59 @@ class RedisGuestAgentStrategy(base.BaseGuestAgentStrategy): class RedisGuestAgentAPI(guest_api.API): + """Cluster Specific Datastore Guest API + + **** VERSION CONTROLLED API **** + + The methods in this class are subject to version control as + coordinated by guestagent/api.py. Whenever a change is made to + any API method in this class, add a version number and comment + to the top of guestagent/api.py and use the version number as + appropriate in this file + """ def get_node_ip(self): LOG.debug("Retrieve ip info from node.") + version = guest_api.API.API_BASE_VERSION + return self._call("get_node_ip", - guest_api.AGENT_HIGH_TIMEOUT, self.version_cap) + guest_api.AGENT_HIGH_TIMEOUT, + version=version) def get_node_id_for_removal(self): LOG.debug("Validating cluster node removal.") + version = guest_api.API.API_BASE_VERSION + return self._call("get_node_id_for_removal", - guest_api.AGENT_HIGH_TIMEOUT, self.version_cap) + guest_api.AGENT_HIGH_TIMEOUT, + version=version) def remove_nodes(self, node_ids): LOG.debug("Removing nodes from cluster.") + version = guest_api.API.API_BASE_VERSION + return self._call("remove_nodes", guest_api.AGENT_HIGH_TIMEOUT, - self.version_cap, node_ids=node_ids) + version=version, node_ids=node_ids) def cluster_meet(self, ip, port): LOG.debug("Joining node to cluster.") + version = guest_api.API.API_BASE_VERSION + return self._call("cluster_meet", guest_api.AGENT_HIGH_TIMEOUT, - self.version_cap, ip=ip, port=port) + version=version, ip=ip, port=port) def cluster_addslots(self, first_slot, last_slot): LOG.debug("Adding slots %s-%s to cluster.", first_slot, last_slot) + version = guest_api.API.API_BASE_VERSION + return self._call("cluster_addslots", - guest_api.AGENT_HIGH_TIMEOUT, self.version_cap, + guest_api.AGENT_HIGH_TIMEOUT, + version=version, first_slot=first_slot, last_slot=last_slot) def cluster_complete(self): LOG.debug("Notifying cluster install completion.") + version = guest_api.API.API_BASE_VERSION + return self._call("cluster_complete", guest_api.AGENT_HIGH_TIMEOUT, - self.version_cap) + version=version) diff --git a/trove/common/strategies/cluster/experimental/vertica/guestagent.py b/trove/common/strategies/cluster/experimental/vertica/guestagent.py index 33c75ff551..ef6354b4ca 100644 --- a/trove/common/strategies/cluster/experimental/vertica/guestagent.py +++ b/trove/common/strategies/cluster/experimental/vertica/guestagent.py @@ -30,39 +30,64 @@ class VerticaGuestAgentStrategy(base.BaseGuestAgentStrategy): class VerticaGuestAgentAPI(guest_api.API): + """Cluster Specific Datastore Guest API + + **** VERSION CONTROLLED API **** + + The methods in this class are subject to version control as + coordinated by guestagent/api.py. Whenever a change is made to + any API method in this class, add a version number and comment + to the top of guestagent/api.py and use the version number as + appropriate in this file + """ def get_public_keys(self, user): LOG.debug("Getting public keys for user: %s." % user) + version = guest_api.API.API_BASE_VERSION + return self._call("get_public_keys", guest_api.AGENT_HIGH_TIMEOUT, - self.version_cap, user=user) + version=version, user=user) def authorize_public_keys(self, user, public_keys): LOG.debug("Authorizing public keys for user: %s." % user) + version = guest_api.API.API_BASE_VERSION + return self._call("authorize_public_keys", - guest_api.AGENT_HIGH_TIMEOUT, self.version_cap, + guest_api.AGENT_HIGH_TIMEOUT, + version=version, user=user, public_keys=public_keys) def install_cluster(self, members): LOG.debug("Installing Vertica cluster on members: %s." % members) + version = guest_api.API.API_BASE_VERSION + return self._call("install_cluster", CONF.cluster_usage_timeout, - self.version_cap, members=members) + version=version, members=members) def grow_cluster(self, members): LOG.debug("Growing Vertica cluster with members: %s." % members) + version = guest_api.API.API_BASE_VERSION + return self._call("grow_cluster", CONF.cluster_usage_timeout, - self.version_cap, members=members) + version=version, members=members) def shrink_cluster(self, members): LOG.debug("Shrinking Vertica cluster with members: %s." % members) + version = guest_api.API.API_BASE_VERSION + return self._call("shrink_cluster", CONF.cluster_usage_timeout, - self.version_cap, members=members) + version=version, members=members) def mark_design_ksafe(self, k): LOG.debug("Setting vertica k-safety level to : %s." % k) + version = guest_api.API.API_BASE_VERSION + return self._call("mark_design_ksafe", CONF.cluster_usage_timeout, - self.version_cap, k=k) + version=version, k=k) def cluster_complete(self): LOG.debug("Notifying cluster install completion.") + version = guest_api.API.API_BASE_VERSION + return self._call("cluster_complete", guest_api.AGENT_HIGH_TIMEOUT, - self.version_cap) + version=version) diff --git a/trove/conductor/api.py b/trove/conductor/api.py index d83aef5c73..757416b225 100644 --- a/trove/conductor/api.py +++ b/trove/conductor/api.py @@ -16,28 +16,50 @@ from oslo_log import log as logging import oslo_messaging as messaging from trove.common import cfg -from trove.common.rpc import version as rpc_version from trove.common.serializable_notification import SerializableNotification from trove import rpc - CONF = cfg.CONF LOG = logging.getLogger(__name__) class API(object): - """API for interacting with trove conductor.""" + """API for interacting with trove conductor. + + API version history: + * 1.0 - Initial version. + + When updating this API, also update API_LATEST_VERSION + """ + + # API_LATEST_VERSION should bump the minor number each time + # a method signature is added or changed + API_LATEST_VERSION = '1.0' + + # API_BASE_VERSION should only change on major version upgrade + API_BASE_VERSION = '1.0' + + VERSION_ALIASES = { + 'icehouse': '1.0', + 'juno': '1.0', + 'kilo': '1.0', + 'liberty': '1.0', + 'mitaka': '1.0', + 'newton': '1.0', + + 'latest': API_LATEST_VERSION + } def __init__(self, context): self.context = context super(API, self).__init__() + version_cap = self.VERSION_ALIASES.get( + CONF.upgrade_levels.conductor, CONF.upgrade_levels.conductor) target = messaging.Target(topic=CONF.conductor_queue, - version=rpc_version.RPC_API_VERSION) + version=version_cap) - self.version_cap = rpc_version.VERSION_ALIASES.get( - CONF.upgrade_levels.conductor) - self.client = self.get_client(target, self.version_cap) + self.client = self.get_client(target, version_cap) def get_client(self, target, version_cap, serializer=None): return rpc.get_client(target, @@ -47,8 +69,9 @@ class API(object): def heartbeat(self, instance_id, payload, sent=None): LOG.debug("Making async call to cast heartbeat for instance: %s" % instance_id) + version = self.API_BASE_VERSION - cctxt = self.client.prepare(version=self.version_cap) + cctxt = self.client.prepare(version=version) cctxt.cast(self.context, "heartbeat", instance_id=instance_id, sent=sent, @@ -58,8 +81,9 @@ class API(object): **backup_fields): LOG.debug("Making async call to cast update_backup for instance: %s" % instance_id) + version = self.API_BASE_VERSION - cctxt = self.client.prepare(version=self.version_cap) + cctxt = self.client.prepare(version=version) cctxt.cast(self.context, "update_backup", instance_id=instance_id, backup_id=backup_id, @@ -69,14 +93,16 @@ class API(object): def report_root(self, instance_id, user): LOG.debug("Making async call to cast report_root for instance: %s" % instance_id) - cctxt = self.client.prepare(version=self.version_cap) + version = self.API_BASE_VERSION + cctxt = self.client.prepare(version=version) cctxt.cast(self.context, "report_root", instance_id=instance_id, user=user) def notify_end(self, **notification_args): LOG.debug("Making async call to cast end notification") - cctxt = self.client.prepare(version=self.version_cap) + version = self.API_BASE_VERSION + cctxt = self.client.prepare(version=version) context = self.context serialized = SerializableNotification.serialize(context, context.notification) @@ -86,7 +112,8 @@ class API(object): def notify_exc_info(self, message, exception): LOG.debug("Making async call to cast error notification") - cctxt = self.client.prepare(version=self.version_cap) + version = self.API_BASE_VERSION + cctxt = self.client.prepare(version=version) context = self.context serialized = SerializableNotification.serialize(context, context.notification) diff --git a/trove/guestagent/api.py b/trove/guestagent/api.py index 43e46f6b6e..180388a07d 100644 --- a/trove/guestagent/api.py +++ b/trove/guestagent/api.py @@ -26,7 +26,6 @@ from trove.common import cfg from trove.common import exception from trove.common.i18n import _ from trove.common.notification import NotificationCastWrapper -import trove.common.rpc.version as rpc_version from trove import rpc CONF = cfg.CONF @@ -37,19 +36,43 @@ AGENT_SNAPSHOT_TIMEOUT = CONF.agent_replication_snapshot_timeout class API(object): - """API for interacting with the guest manager.""" + """API for interacting with the guest manager. + + API version history: + * 1.0 - Initial version. + + When updating this API, also update API_LATEST_VERSION + """ + + # API_LATEST_VERSION should bump the minor number each time + # a method signature is added or changed + API_LATEST_VERSION = '1.0' + + # API_BASE_VERSION should only change on major version upgrade + API_BASE_VERSION = '1.0' + + VERSION_ALIASES = { + 'icehouse': '1.0', + 'juno': '1.0', + 'kilo': '1.0', + 'liberty': '1.0', + 'mitaka': '1.0', + 'newton': '1.0', + + 'latest': API_LATEST_VERSION + } def __init__(self, context, id): self.context = context self.id = id super(API, self).__init__() + version_cap = self.VERSION_ALIASES.get( + CONF.upgrade_levels.guestagent, CONF.upgrade_levels.guestagent) target = messaging.Target(topic=self._get_routing_key(), - version=rpc_version.RPC_API_VERSION) + version=version_cap) - self.version_cap = rpc_version.VERSION_ALIASES.get( - CONF.upgrade_levels.guestagent) - self.client = self.get_client(target, self.version_cap) + self.client = self.get_client(target, version_cap) def get_client(self, target, version_cap, serializer=None): return rpc.get_client(target, @@ -95,31 +118,44 @@ class API(object): users. """ LOG.debug("Changing passwords for users on instance %s.", self.id) - self._cast("change_passwords", self.version_cap, users=users) + version = self.API_BASE_VERSION + + self._cast("change_passwords", version=version, users=users) def update_attributes(self, username, hostname, user_attrs): """Update user attributes.""" LOG.debug("Changing user attributes on instance %s.", self.id) - self._cast("update_attributes", self.version_cap, username=username, + version = self.API_BASE_VERSION + + self._cast("update_attributes", + version=version, username=username, hostname=hostname, user_attrs=user_attrs) def create_user(self, users): """Make an asynchronous call to create a new database user""" LOG.debug("Creating Users for instance %s.", self.id) - self._cast("create_user", self.version_cap, users=users) + version = self.API_BASE_VERSION + + self._cast("create_user", version=version, users=users) def get_user(self, username, hostname): """Make an asynchronous call to get a single database user.""" LOG.debug("Getting a user %(username)s on instance %(id)s.", {'username': username, 'id': self.id}) - return self._call("get_user", AGENT_LOW_TIMEOUT, self.version_cap, + version = self.API_BASE_VERSION + + return self._call("get_user", + AGENT_LOW_TIMEOUT, version=version, username=username, hostname=hostname) def list_access(self, username, hostname): """Show all the databases to which a user has more than USAGE.""" LOG.debug("Showing user %(username)s grants on instance %(id)s.", {'username': username, 'id': self.id}) - return self._call("list_access", AGENT_LOW_TIMEOUT, self.version_cap, + version = self.API_BASE_VERSION + + return self._call("list_access", + AGENT_LOW_TIMEOUT, version=version, username=username, hostname=hostname) def grant_access(self, username, hostname, databases): @@ -128,7 +164,10 @@ class API(object): "%(username)s on instance %(id)s.", {'username': username, 'databases': databases, 'id': self.id}) - return self._call("grant_access", AGENT_LOW_TIMEOUT, self.version_cap, + version = self.API_BASE_VERSION + + return self._call("grant_access", + AGENT_LOW_TIMEOUT, version=version, username=username, hostname=hostname, databases=databases) @@ -138,14 +177,20 @@ class API(object): "%(username)s on instance %(id)s.", {'username': username, 'database': database, 'id': self.id}) - return self._call("revoke_access", AGENT_LOW_TIMEOUT, self.version_cap, + version = self.API_BASE_VERSION + + return self._call("revoke_access", + AGENT_LOW_TIMEOUT, version=version, username=username, hostname=hostname, database=database) def list_users(self, limit=None, marker=None, include_marker=False): """Make an asynchronous call to list database users.""" LOG.debug("Listing Users for instance %s.", self.id) - return self._call("list_users", AGENT_HIGH_TIMEOUT, self.version_cap, + version = self.API_BASE_VERSION + + return self._call("list_users", AGENT_HIGH_TIMEOUT, + version=version, limit=limit, marker=marker, include_marker=include_marker) @@ -153,20 +198,27 @@ class API(object): """Make an asynchronous call to delete an existing database user.""" LOG.debug("Deleting user %(user)s for instance %(instance_id)s." % {'user': user, 'instance_id': self.id}) - self._cast("delete_user", self.version_cap, user=user) + version = self.API_BASE_VERSION + + self._cast("delete_user", version=version, user=user) def create_database(self, databases): """Make an asynchronous call to create a new database within the specified container """ LOG.debug("Creating databases for instance %s.", self.id) - self._cast("create_database", self.version_cap, databases=databases) + version = self.API_BASE_VERSION + + self._cast("create_database", version=version, + databases=databases) def list_databases(self, limit=None, marker=None, include_marker=False): """Make an asynchronous call to list databases.""" LOG.debug("Listing databases for instance %s.", self.id) + version = self.API_BASE_VERSION + return self._call("list_databases", AGENT_LOW_TIMEOUT, - self.version_cap, limit=limit, marker=marker, + version=version, limit=limit, marker=marker, include_marker=include_marker) def delete_database(self, database): @@ -176,53 +228,72 @@ class API(object): LOG.debug("Deleting database %(database)s for " "instance %(instance_id)s." % {'database': database, 'instance_id': self.id}) - self._cast("delete_database", self.version_cap, database=database) + version = self.API_BASE_VERSION + + self._cast("delete_database", version=version, database=database) def enable_root(self): """Make a synchronous call to enable the root user for access from anywhere """ LOG.debug("Enable root user for instance %s.", self.id) - return self._call("enable_root", AGENT_HIGH_TIMEOUT, self.version_cap) + version = self.API_BASE_VERSION + + return self._call("enable_root", AGENT_HIGH_TIMEOUT, + version=version) def enable_root_with_password(self, root_password=None): """Make a synchronous call to enable the root user for access from anywhere """ LOG.debug("Enable root user for instance %s.", self.id) + version = self.API_BASE_VERSION + return self._call("enable_root_with_password", AGENT_HIGH_TIMEOUT, - self.version_cap, root_password=root_password) + version=version, root_password=root_password) def disable_root(self): """Make a synchronous call to disable the root user for access from anywhere """ LOG.debug("Disable root user for instance %s.", self.id) - return self._call("disable_root", AGENT_LOW_TIMEOUT, self.version_cap) + version = self.API_BASE_VERSION + + return self._call("disable_root", AGENT_LOW_TIMEOUT, + version=version) def is_root_enabled(self): """Make a synchronous call to check if root access is available for the container """ LOG.debug("Check root access for instance %s.", self.id) + version = self.API_BASE_VERSION + return self._call("is_root_enabled", AGENT_LOW_TIMEOUT, - self.version_cap) + version=version) def get_hwinfo(self): """Make a synchronous call to get hardware info for the container""" LOG.debug("Check hwinfo on instance %s.", self.id) - return self._call("get_hwinfo", AGENT_LOW_TIMEOUT, self.version_cap) + version = self.API_BASE_VERSION + + return self._call("get_hwinfo", AGENT_LOW_TIMEOUT, + version=version) def get_diagnostics(self): """Make a synchronous call to get diagnostics for the container""" LOG.debug("Check diagnostics on instance %s.", self.id) + version = self.API_BASE_VERSION + return self._call("get_diagnostics", AGENT_LOW_TIMEOUT, - self.version_cap) + version=version) def rpc_ping(self): """Make a synchronous RPC call to check if we can ping the instance.""" LOG.debug("Check RPC ping on instance %s.", self.id) - return self._call("rpc_ping", AGENT_LOW_TIMEOUT, self.version_cap) + version = self.API_BASE_VERSION + + return self._call("rpc_ping", AGENT_LOW_TIMEOUT, version=version) def prepare(self, memory_mb, packages, databases, users, device_path='/dev/vdb', mount_point='/mnt/volume', @@ -234,6 +305,8 @@ class API(object): """ LOG.debug("Sending the call to prepare the Guest.") + version = self.API_BASE_VERSION + # Taskmanager is a publisher, guestagent is a consumer. Usually # consumer creates a queue, but in this case we have to make sure # "prepare" doesn't get lost if for some reason guest was delayed and @@ -242,7 +315,7 @@ class API(object): packages = packages.split() self._cast( - "prepare", self.version_cap, packages=packages, + "prepare", version=version, packages=packages, databases=databases, memory_mb=memory_mb, users=users, device_path=device_path, mount_point=mount_point, backup_info=backup_info, config_contents=config_contents, @@ -258,7 +331,7 @@ class API(object): server = None target = messaging.Target(topic=self._get_routing_key(), server=self.id, - version=rpc_version.RPC_API_VERSION) + version=self.API_BASE_VERSION) try: server = rpc.get_server(target, []) server.start() @@ -270,26 +343,35 @@ class API(object): def pre_upgrade(self): """Prepare the guest for upgrade.""" LOG.debug("Sending the call to prepare the guest for upgrade.") - return self._call("pre_upgrade", AGENT_HIGH_TIMEOUT, self.version_cap) + version = self.API_BASE_VERSION + + return self._call("pre_upgrade", AGENT_HIGH_TIMEOUT, + version=version) def post_upgrade(self, upgrade_info): """Recover the guest after upgrading the guest's image.""" LOG.debug("Recover the guest after upgrading the guest's image.") - self._call("post_upgrade", AGENT_HIGH_TIMEOUT, self.version_cap, + version = self.API_BASE_VERSION + + self._call("post_upgrade", AGENT_HIGH_TIMEOUT, version=version, upgrade_info=upgrade_info) def restart(self): """Restart the database server.""" LOG.debug("Sending the call to restart the database process " "on the Guest.") - self._call("restart", AGENT_HIGH_TIMEOUT, self.version_cap) + version = self.API_BASE_VERSION + + self._call("restart", AGENT_HIGH_TIMEOUT, version=version) def start_db_with_conf_changes(self, config_contents): """Start the database server.""" LOG.debug("Sending the call to start the database process on " "the Guest with a timeout of %s." % AGENT_HIGH_TIMEOUT) + version = self.API_BASE_VERSION + self._call("start_db_with_conf_changes", AGENT_HIGH_TIMEOUT, - self.version_cap, config_contents=config_contents) + version=version, config_contents=config_contents) def reset_configuration(self, configuration): """Ignore running state of the database server; just change @@ -297,20 +379,26 @@ class API(object): """ LOG.debug("Sending the call to change the database conf file on the " "Guest with a timeout of %s." % AGENT_HIGH_TIMEOUT) + version = self.API_BASE_VERSION + self._call("reset_configuration", AGENT_HIGH_TIMEOUT, - self.version_cap, configuration=configuration) + version=version, configuration=configuration) def stop_db(self, do_not_start_on_reboot=False): """Stop the database server.""" LOG.debug("Sending the call to stop the database process " "on the Guest.") - self._call("stop_db", AGENT_HIGH_TIMEOUT, self.version_cap, + version = self.API_BASE_VERSION + + self._call("stop_db", AGENT_HIGH_TIMEOUT, version=version, do_not_start_on_reboot=do_not_start_on_reboot) def upgrade(self, instance_version, location, metadata=None): """Make an asynchronous call to self upgrade the guest agent.""" LOG.debug("Sending an upgrade call to nova-guest.") - self._cast("upgrade", self.version_cap, + version = self.API_BASE_VERSION + + self._cast("upgrade", version=version, instance_version=instance_version, location=location, metadata=metadata) @@ -318,158 +406,214 @@ class API(object): def get_volume_info(self): """Make a synchronous call to get volume info for the container.""" LOG.debug("Check Volume Info on instance %s.", self.id) + version = self.API_BASE_VERSION + return self._call("get_filesystem_stats", AGENT_LOW_TIMEOUT, - self.version_cap, fs_path=None) + version=version, fs_path=None) def update_guest(self): """Make a synchronous call to update the guest agent.""" LOG.debug("Updating guest agent on instance %s.", self.id) - self._call("update_guest", AGENT_HIGH_TIMEOUT, self.version_cap) + version = self.API_BASE_VERSION + + self._call("update_guest", AGENT_HIGH_TIMEOUT, version=version) def create_backup(self, backup_info): """Make async call to create a full backup of this instance.""" LOG.debug("Create Backup %(backup_id)s " "for instance %(instance_id)s." % {'backup_id': backup_info['id'], 'instance_id': self.id}) - self._cast("create_backup", self.version_cap, backup_info=backup_info) + version = self.API_BASE_VERSION + + self._cast("create_backup", version=version, + backup_info=backup_info) def mount_volume(self, device_path=None, mount_point=None): """Mount the volume.""" LOG.debug("Mount volume %(mount)s on instance %(id)s." % { 'mount': mount_point, 'id': self.id}) - self._call("mount_volume", AGENT_LOW_TIMEOUT, self.version_cap, + version = self.API_BASE_VERSION + + self._call("mount_volume", AGENT_LOW_TIMEOUT, version=version, device_path=device_path, mount_point=mount_point) def unmount_volume(self, device_path=None, mount_point=None): """Unmount the volume.""" LOG.debug("Unmount volume %(device)s on instance %(id)s." % { 'device': device_path, 'id': self.id}) - self._call("unmount_volume", AGENT_LOW_TIMEOUT, self.version_cap, + version = self.API_BASE_VERSION + + self._call("unmount_volume", AGENT_LOW_TIMEOUT, version=version, device_path=device_path, mount_point=mount_point) def resize_fs(self, device_path=None, mount_point=None): """Resize the filesystem.""" LOG.debug("Resize device %(device)s on instance %(id)s." % { 'device': device_path, 'id': self.id}) - self._call("resize_fs", AGENT_HIGH_TIMEOUT, self.version_cap, + version = self.API_BASE_VERSION + + self._call("resize_fs", AGENT_HIGH_TIMEOUT, version=version, device_path=device_path, mount_point=mount_point) def update_overrides(self, overrides, remove=False): """Update the overrides.""" LOG.debug("Updating overrides values %(overrides)s on instance " "%(id)s.", {'overrides': overrides, 'id': self.id}) + version = self.API_BASE_VERSION + self._call("update_overrides", AGENT_HIGH_TIMEOUT, - self.version_cap, overrides=overrides, remove=remove) + version=version, overrides=overrides, remove=remove) def apply_overrides(self, overrides): LOG.debug("Applying overrides values %(overrides)s on instance " "%(id)s.", {'overrides': overrides, 'id': self.id}) - self._call("apply_overrides", AGENT_HIGH_TIMEOUT, self.version_cap, - overrides=overrides) + version = self.API_BASE_VERSION + + self._call("apply_overrides", AGENT_HIGH_TIMEOUT, + version=version, overrides=overrides) def backup_required_for_replication(self): LOG.debug("Checking backup requirement for replication") + version = self.API_BASE_VERSION + return self._call("backup_required_for_replication", AGENT_LOW_TIMEOUT, - self.version_cap) + version=version) def get_replication_snapshot(self, snapshot_info=None, replica_source_config=None): LOG.debug("Retrieving replication snapshot from instance %s.", self.id) + version = self.API_BASE_VERSION + return self._call("get_replication_snapshot", AGENT_SNAPSHOT_TIMEOUT, - self.version_cap, snapshot_info=snapshot_info, + version=version, snapshot_info=snapshot_info, replica_source_config=replica_source_config) def attach_replication_slave(self, snapshot, replica_config=None): LOG.debug("Configuring instance %s to replicate from %s.", self.id, snapshot.get('master').get('id')) - self._cast("attach_replication_slave", self.version_cap, + version = self.API_BASE_VERSION + + self._cast("attach_replication_slave", version=version, snapshot=snapshot, slave_config=replica_config) def detach_replica(self, for_failover=False): LOG.debug("Detaching replica %s from its replication source.", self.id) + version = self.API_BASE_VERSION + return self._call("detach_replica", AGENT_HIGH_TIMEOUT, - self.version_cap, for_failover=for_failover) + version=version, for_failover=for_failover) def get_replica_context(self): LOG.debug("Getting replica context.") + version = self.API_BASE_VERSION + return self._call("get_replica_context", - AGENT_HIGH_TIMEOUT, self.version_cap) + AGENT_HIGH_TIMEOUT, version=version) def attach_replica(self, replica_info, slave_config): LOG.debug("Attaching replica %s." % replica_info) - self._call("attach_replica", AGENT_HIGH_TIMEOUT, self.version_cap, + version = self.API_BASE_VERSION + + self._call("attach_replica", AGENT_HIGH_TIMEOUT, version=version, replica_info=replica_info, slave_config=slave_config) def make_read_only(self, read_only): LOG.debug("Executing make_read_only(%s)" % read_only) - self._call("make_read_only", AGENT_HIGH_TIMEOUT, self.version_cap, + version = self.API_BASE_VERSION + + self._call("make_read_only", AGENT_HIGH_TIMEOUT, version=version, read_only=read_only) def enable_as_master(self, replica_source_config): LOG.debug("Executing enable_as_master") - self._call("enable_as_master", AGENT_HIGH_TIMEOUT, self.version_cap, + version = self.API_BASE_VERSION + + self._call("enable_as_master", AGENT_HIGH_TIMEOUT, + version=version, replica_source_config=replica_source_config) # DEPRECATED: Maintain for API Compatibility def get_txn_count(self): LOG.debug("Executing get_txn_count.") + version = self.API_BASE_VERSION + return self._call("get_txn_count", - AGENT_HIGH_TIMEOUT, self.version_cap) + AGENT_HIGH_TIMEOUT, version=version) def get_last_txn(self): LOG.debug("Executing get_last_txn.") + version = self.API_BASE_VERSION + return self._call("get_last_txn", - AGENT_HIGH_TIMEOUT, self.version_cap) + AGENT_HIGH_TIMEOUT, version=version) def get_latest_txn_id(self): LOG.debug("Executing get_latest_txn_id.") + version = self.API_BASE_VERSION + return self._call("get_latest_txn_id", - AGENT_HIGH_TIMEOUT, self.version_cap) + AGENT_HIGH_TIMEOUT, version=version) def wait_for_txn(self, txn): LOG.debug("Executing wait_for_txn.") - self._call("wait_for_txn", AGENT_HIGH_TIMEOUT, self.version_cap, + version = self.API_BASE_VERSION + + self._call("wait_for_txn", AGENT_HIGH_TIMEOUT, version=version, txn=txn) def cleanup_source_on_replica_detach(self, replica_info): LOG.debug("Cleaning up master %s on detach of replica.", self.id) + version = self.API_BASE_VERSION + self._call("cleanup_source_on_replica_detach", AGENT_HIGH_TIMEOUT, - self.version_cap, replica_info=replica_info) + version=version, replica_info=replica_info) def demote_replication_master(self): LOG.debug("Demoting instance %s to non-master.", self.id) + version = self.API_BASE_VERSION + self._call("demote_replication_master", AGENT_HIGH_TIMEOUT, - self.version_cap) + version=version) def guest_log_list(self): LOG.debug("Retrieving guest log list for %s.", self.id) + version = self.API_BASE_VERSION + result = self._call("guest_log_list", AGENT_HIGH_TIMEOUT, - self.version_cap) + version=version) LOG.debug("guest_log_list returns %s", result) return result def guest_log_action(self, log_name, enable, disable, publish, discard): LOG.debug("Processing guest log '%s' for %s.", log_name, self.id) + version = self.API_BASE_VERSION + return self._call("guest_log_action", AGENT_HIGH_TIMEOUT, - self.version_cap, log_name=log_name, + version=version, log_name=log_name, enable=enable, disable=disable, publish=publish, discard=discard) def module_list(self, include_contents): LOG.debug("Querying modules on %s (contents: %s).", self.id, include_contents) + version = self.API_BASE_VERSION + result = self._call("module_list", AGENT_HIGH_TIMEOUT, - self.version_cap, + version=version, include_contents=include_contents) return result def module_apply(self, modules): LOG.debug("Applying modules to %s.", self.id) + version = self.API_BASE_VERSION + return self._call("module_apply", AGENT_HIGH_TIMEOUT, - self.version_cap, modules=modules) + version=version, modules=modules) def module_remove(self, module): LOG.debug("Removing modules from %s.", self.id) + version = self.API_BASE_VERSION + return self._call("module_remove", AGENT_HIGH_TIMEOUT, - self.version_cap, module=module) + version=version, module=module) diff --git a/trove/taskmanager/api.py b/trove/taskmanager/api.py index 881574a220..1c1b01aa7f 100644 --- a/trove/taskmanager/api.py +++ b/trove/taskmanager/api.py @@ -24,7 +24,6 @@ import oslo_messaging as messaging from trove.common import cfg from trove.common import exception from trove.common.notification import NotificationCastWrapper -import trove.common.rpc.version as rpc_version from trove.common.strategies.cluster import strategy from trove.guestagent import models as agent_models from trove import rpc @@ -34,18 +33,42 @@ LOG = logging.getLogger(__name__) class API(object): - """API for interacting with the task manager.""" + """API for interacting with the task manager. + + API version history: + * 1.0 - Initial version. + + When updating this API, also update API_LATEST_VERSION + """ + + # API_LATEST_VERSION should bump the minor number each time + # a method signature is added or changed + API_LATEST_VERSION = '1.0' + + # API_BASE_VERSION should only change on major version upgrade + API_BASE_VERSION = '1.0' + + VERSION_ALIASES = { + 'icehouse': '1.0', + 'juno': '1.0', + 'kilo': '1.0', + 'liberty': '1.0', + 'mitaka': '1.0', + 'newton': '1.0', + + 'latest': API_LATEST_VERSION + } def __init__(self, context): self.context = context super(API, self).__init__() + version_cap = self.VERSION_ALIASES.get( + CONF.upgrade_levels.taskmanager, CONF.upgrade_levels.taskmanager) target = messaging.Target(topic=CONF.taskmanager_queue, - version=rpc_version.RPC_API_VERSION) + version=version_cap) - self.version_cap = rpc_version.VERSION_ALIASES.get( - CONF.upgrade_levels.taskmanager) - self.client = self.get_client(target, self.version_cap) + self.client = self.get_client(target, version_cap) def _cast(self, method_name, version, **kwargs): LOG.debug("Casting %s" % method_name) @@ -79,72 +102,83 @@ class API(object): def resize_volume(self, new_size, instance_id): LOG.debug("Making async call to resize volume for instance: %s" % instance_id) + version = self.API_BASE_VERSION - self._cast("resize_volume", self.version_cap, + self._cast("resize_volume", version=version, new_size=new_size, instance_id=instance_id) def resize_flavor(self, instance_id, old_flavor, new_flavor): LOG.debug("Making async call to resize flavor for instance: %s" % instance_id) + version = self.API_BASE_VERSION - self._cast("resize_flavor", self.version_cap, + self._cast("resize_flavor", version=version, instance_id=instance_id, old_flavor=self._transform_obj(old_flavor), new_flavor=self._transform_obj(new_flavor)) def reboot(self, instance_id): LOG.debug("Making async call to reboot instance: %s" % instance_id) + version = self.API_BASE_VERSION - self._cast("reboot", self.version_cap, instance_id=instance_id) + self._cast("reboot", version=version, instance_id=instance_id) def restart(self, instance_id): LOG.debug("Making async call to restart instance: %s" % instance_id) + version = self.API_BASE_VERSION - self._cast("restart", self.version_cap, instance_id=instance_id) + self._cast("restart", version=version, instance_id=instance_id) def detach_replica(self, instance_id): LOG.debug("Making async call to detach replica: %s" % instance_id) + version = self.API_BASE_VERSION - self._cast("detach_replica", self.version_cap, + self._cast("detach_replica", version=version, instance_id=instance_id) def promote_to_replica_source(self, instance_id): LOG.debug("Making async call to promote replica to source: %s" % instance_id) - self._cast("promote_to_replica_source", self.version_cap, + version = self.API_BASE_VERSION + self._cast("promote_to_replica_source", version=version, instance_id=instance_id) def eject_replica_source(self, instance_id): LOG.debug("Making async call to eject replica source: %s" % instance_id) - self._cast("eject_replica_source", self.version_cap, + version = self.API_BASE_VERSION + self._cast("eject_replica_source", version=version, instance_id=instance_id) def migrate(self, instance_id, host): LOG.debug("Making async call to migrate instance: %s" % instance_id) + version = self.API_BASE_VERSION - self._cast("migrate", self.version_cap, + self._cast("migrate", version=version, instance_id=instance_id, host=host) def delete_instance(self, instance_id): LOG.debug("Making async call to delete instance: %s" % instance_id) + version = self.API_BASE_VERSION - self._cast("delete_instance", self.version_cap, + self._cast("delete_instance", version=version, instance_id=instance_id) def create_backup(self, backup_info, instance_id): LOG.debug("Making async call to create a backup for instance: %s" % instance_id) + version = self.API_BASE_VERSION - self._cast("create_backup", self.version_cap, + self._cast("create_backup", version=version, backup_info=backup_info, instance_id=instance_id) def delete_backup(self, backup_id): LOG.debug("Making async call to delete backup: %s" % backup_id) + version = self.API_BASE_VERSION - self._cast("delete_backup", self.version_cap, backup_id=backup_id) + self._cast("delete_backup", version=version, backup_id=backup_id) def create_instance(self, instance_id, name, flavor, image_id, databases, users, datastore_manager, @@ -155,7 +189,8 @@ class API(object): modules=None, locality=None): LOG.debug("Making async call to create instance %s " % instance_id) - self._cast("create_instance", self.version_cap, + version = self.API_BASE_VERSION + self._cast("create_instance", version=version, instance_id=instance_id, name=name, flavor=self._transform_obj(flavor), image_id=image_id, @@ -176,33 +211,38 @@ class API(object): def create_cluster(self, cluster_id): LOG.debug("Making async call to create cluster %s " % cluster_id) + version = self.API_BASE_VERSION - self._cast("create_cluster", self.version_cap, cluster_id=cluster_id) + self._cast("create_cluster", version=version, cluster_id=cluster_id) def grow_cluster(self, cluster_id, new_instance_ids): LOG.debug("Making async call to grow cluster %s " % cluster_id) + version = self.API_BASE_VERSION - cctxt = self.client.prepare(version=self.version_cap) + cctxt = self.client.prepare(version=version) cctxt.cast(self.context, "grow_cluster", cluster_id=cluster_id, new_instance_ids=new_instance_ids) def shrink_cluster(self, cluster_id, instance_ids): LOG.debug("Making async call to shrink cluster %s " % cluster_id) + version = self.API_BASE_VERSION - cctxt = self.client.prepare(version=self.version_cap) + cctxt = self.client.prepare(version=version) cctxt.cast(self.context, "shrink_cluster", cluster_id=cluster_id, instance_ids=instance_ids) def delete_cluster(self, cluster_id): LOG.debug("Making async call to delete cluster %s " % cluster_id) + version = self.API_BASE_VERSION - self._cast("delete_cluster", self.version_cap, cluster_id=cluster_id) + self._cast("delete_cluster", version=version, cluster_id=cluster_id) def upgrade(self, instance_id, datastore_version_id): LOG.debug("Making async call to upgrade guest to datastore " "version %s " % datastore_version_id) + version = self.API_BASE_VERSION - cctxt = self.client.prepare(version=self.version_cap) + cctxt = self.client.prepare(version=version) cctxt.cast(self.context, "upgrade", instance_id=instance_id, datastore_version_id=datastore_version_id) diff --git a/trove/taskmanager/manager.py b/trove/taskmanager/manager.py index a70872b723..81feaf0d20 100644 --- a/trove/taskmanager/manager.py +++ b/trove/taskmanager/manager.py @@ -14,7 +14,6 @@ # under the License. from oslo_log import log as logging -import oslo_messaging as messaging from oslo_service import periodic_task from oslo_utils import importutils @@ -27,7 +26,6 @@ from trove.common.exception import TroveError from trove.common.i18n import _ from trove.common.notification import DBaaSQuotas, EndNotification from trove.common import remote -import trove.common.rpc.version as rpc_version from trove.common import server_group as srv_grp from trove.common.strategies.cluster import strategy from trove.datastore.models import DatastoreVersion @@ -43,8 +41,6 @@ CONF = cfg.CONF class Manager(periodic_task.PeriodicTasks): - target = messaging.Target(version=rpc_version.RPC_API_VERSION) - def __init__(self): super(Manager, self).__init__(CONF) self.admin_context = TroveContext(