From 7b1fe23039391a187f0fc408f7d64787bad4c585 Mon Sep 17 00:00:00 2001 From: daniel-a-nguyen Date: Wed, 19 Mar 2014 18:14:53 -0700 Subject: [PATCH] Add a new column and indexes to agent_heartbeats Includes a new index on instance_id Added a deleted and deleted_at column Includes an index for deleted too Includes in call to find all by version Includes call in _delete_resources() Fixed AgentHeartBeat delete call Corrected downgrade script as per SlickNik Needs the save method because of how we are calling create Reworked migration to create table from scratch Renamed migration from 026 --> 027 Exception handling, added not null to updated_at col Renamed migration from 027 --> 028 Change-Id: Ie0568850856c32fdbf9495c496339730d6b5beef Implements: blueprint upgrade-guestagent --- .../versions/028_recreate_agent_heartbeat.py | 81 +++++++ trove/guestagent/models.py | 29 ++- trove/instance/models.py | 1 + trove/taskmanager/api.py | 11 + .../test_agent_heartbeats_models.py | 209 ++++++++++++++++++ 5 files changed, 330 insertions(+), 1 deletion(-) create mode 100644 trove/db/sqlalchemy/migrate_repo/versions/028_recreate_agent_heartbeat.py create mode 100644 trove/tests/unittests/guestagent/test_agent_heartbeats_models.py diff --git a/trove/db/sqlalchemy/migrate_repo/versions/028_recreate_agent_heartbeat.py b/trove/db/sqlalchemy/migrate_repo/versions/028_recreate_agent_heartbeat.py new file mode 100644 index 0000000000..1e2acf9296 --- /dev/null +++ b/trove/db/sqlalchemy/migrate_repo/versions/028_recreate_agent_heartbeat.py @@ -0,0 +1,81 @@ +# Copyright 2014 Hewlett-Packard Development Company, L.P. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from sqlalchemy.exc import OperationalError +from sqlalchemy.schema import Column +from sqlalchemy.schema import MetaData +from trove.db.sqlalchemy.migrate_repo.schema import Boolean +from trove.db.sqlalchemy.migrate_repo.schema import create_tables +from trove.db.sqlalchemy.migrate_repo.schema import DateTime +from trove.db.sqlalchemy.migrate_repo.schema import drop_tables +from trove.db.sqlalchemy.migrate_repo.schema import String +from trove.db.sqlalchemy.migrate_repo.schema import Table +from trove.openstack.common import log as logging + + +logger = logging.getLogger('trove.db.sqlalchemy.migrate_repo.schema') + + +def upgrade(migrate_engine): + meta = MetaData() + meta.bind = migrate_engine + + # new table with desired columns, indexes, and constraints + new_agent_heartbeats = Table( + 'agent_heartbeats', meta, + Column('id', String(36), primary_key=True, nullable=False), + Column('instance_id', String(36), + nullable=False, unique=True, index=True), + Column('guest_agent_version', String(255), index=True), + Column('deleted', Boolean(), index=True), + Column('deleted_at', DateTime()), + Column('updated_at', DateTime(), nullable=False)) + + # original table from migration 005_heartbeat.py + previous_agent_heartbeats = Table('agent_heartbeats', meta, autoload=True) + + try: + drop_tables([previous_agent_heartbeats]) + except OperationalError as e: + logger.warn("This table may have been dropped by some other means.") + logger.warn(e) + + create_tables([new_agent_heartbeats]) + + +def downgrade(migrate_engine): + meta = MetaData() + meta.bind = migrate_engine + + # new table with desired columns, indexes, and constraints + new_agent_heartbeats = Table('agent_heartbeats', meta, autoload=True) + + try: + drop_tables([new_agent_heartbeats]) + except OperationalError as e: + logger.warn("This table may have been dropped by some other means.") + logger.warn(e) + + # reset the migrate_engine + meta = MetaData() + meta.bind = migrate_engine + + # original table from migration 005_heartbeat.py + previous_agent_heartbeats = Table( + 'agent_heartbeats', meta, Column('id', String(36), primary_key=True, + nullable=False), + Column('instance_id', String(36), nullable=False), + Column('updated_at', DateTime()), extend_existing=True) + + create_tables([previous_agent_heartbeats]) diff --git a/trove/guestagent/models.py b/trove/guestagent/models.py index 07ffcac229..7e52d5a307 100644 --- a/trove/guestagent/models.py +++ b/trove/guestagent/models.py @@ -36,7 +36,8 @@ def persisted_models(): class AgentHeartBeat(dbmodels.DatabaseModelBase): """Defines the state of a Guest Agent.""" - _data_fields = ['instance_id', 'updated_at'] + _data_fields = ['instance_id', 'updated_at', 'guest_agent_version', + 'deleted', 'deleted_at'] _table_name = 'agent_heartbeats' def __init__(self, **kwargs): @@ -58,6 +59,32 @@ class AgentHeartBeat(dbmodels.DatabaseModelBase): {'name': self.__class__.__name__, 'dict': self.__dict__}) return get_db_api().save(self) + @classmethod + def find_all_by_version(cls, guest_agent_version, deleted=0): + if guest_agent_version is None: + raise exception.ModelNotFoundError() + + heartbeats = cls.find_all(guest_agent_version=guest_agent_version, + deleted=deleted) + + if heartbeats is None or heartbeats.count() == 0: + raise exception.ModelNotFoundError( + guest_agent_version=guest_agent_version) + + return heartbeats + + @classmethod + def find_by_instance_id(cls, instance_id): + if instance_id is None: + raise exception.ModelNotFoundError(instance_id=instance_id) + + try: + return cls.find_by(instance_id=instance_id) + + except exception.NotFound as e: + LOG.error(e.message) + raise exception.ModelNotFoundError(instance_id=instance_id) + @staticmethod def is_active(agent): return (datetime.now() - agent.updated_at < diff --git a/trove/instance/models.py b/trove/instance/models.py index 1cb4ebdfb3..431dc65cd5 100644 --- a/trove/instance/models.py +++ b/trove/instance/models.py @@ -514,6 +514,7 @@ class BaseInstance(SimpleInstance): _delete_resources) def _delete_resources(self, deleted_at): + """Implemented in subclass.""" pass def delete_async(self): diff --git a/trove/taskmanager/api.py b/trove/taskmanager/api.py index fcf2709507..4edf772fd3 100644 --- a/trove/taskmanager/api.py +++ b/trove/taskmanager/api.py @@ -20,6 +20,8 @@ Routes all the requests to the task manager. from trove.common import cfg +from trove.common import exception +from trove.guestagent import models as agent_models from trove.openstack.common.rpc import proxy from trove.openstack.common import log as logging @@ -51,6 +53,14 @@ class API(proxy.RpcProxy): """Create the routing key for the taskmanager""" return CONF.taskmanager_queue + def _delete_heartbeat(self, instance_id): + agent_heart_beat = agent_models.AgentHeartBeat() + try: + heartbeat = agent_heart_beat.find_by_instance_id(instance_id) + heartbeat.delete() + except exception.ModelNotFoundError as e: + LOG.error(e.message) + def resize_volume(self, new_size, instance_id): LOG.debug("Making async call to resize volume for instance: %s" % instance_id) @@ -86,6 +96,7 @@ class API(proxy.RpcProxy): LOG.debug("Making async call to delete instance: %s" % instance_id) self.cast(self.context, self.make_msg("delete_instance", instance_id=instance_id)) + self._delete_heartbeat(instance_id) def create_backup(self, backup_info, instance_id): LOG.debug("Making async call to create a backup for instance: %s" % diff --git a/trove/tests/unittests/guestagent/test_agent_heartbeats_models.py b/trove/tests/unittests/guestagent/test_agent_heartbeats_models.py new file mode 100644 index 0000000000..6b205793ac --- /dev/null +++ b/trove/tests/unittests/guestagent/test_agent_heartbeats_models.py @@ -0,0 +1,209 @@ +# Copyright 2014 Hewlett-Packard Development Company, L.P. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import testtools +import uuid +from trove.common import exception +from trove.guestagent.models import AgentHeartBeat +from trove.tests.unittests.util import util + + +class AgentHeartBeatTest(testtools.TestCase): + + def setUp(self): + super(AgentHeartBeatTest, self).setUp() + util.init_db() + + def tearDown(self): + super(AgentHeartBeatTest, self).tearDown() + + def test_create(self): + """ + Test the creation of a new agent heartbeat record + """ + instance_id = str(uuid.uuid4()) + heartbeat = AgentHeartBeat.create( + instance_id=instance_id) + self.assertIsNotNone(heartbeat) + + self.assertIsNotNone(heartbeat.id) + self.assertIsNotNone(heartbeat.instance_id) + self.assertEqual(instance_id, + heartbeat.instance_id) + self.assertIsNotNone(heartbeat.updated_at) + self.assertIsNone(heartbeat.guest_agent_version) + + def test_create_with_version(self): + """ + Test the creation of a new agent heartbeat record w/ guest version + """ + instance_id = str(uuid.uuid4()) + heartbeat = AgentHeartBeat.create( + instance_id=instance_id, + guest_agent_version="1.2.3") + self.assertIsNotNone(heartbeat) + + self.assertIsNotNone(heartbeat.id) + self.assertIsNotNone(heartbeat.instance_id) + self.assertEqual(instance_id, + heartbeat.instance_id) + self.assertIsNotNone(heartbeat.updated_at) + self.assertIsNotNone(heartbeat.guest_agent_version) + self.assertEqual("1.2.3", heartbeat.guest_agent_version) + + def test_find_by_instance_id(self): + """ + Test to retrieve a guest agents by it's id + """ + # create a unique record + instance_id = str(uuid.uuid4()) + heartbeat = AgentHeartBeat.create( + instance_id=instance_id, guest_agent_version="1.2.3") + self.assertIsNotNone(heartbeat) + self.assertIsNotNone(heartbeat.id) + self.assertIsNotNone(heartbeat.instance_id) + self.assertEqual(instance_id, heartbeat.instance_id) + self.assertIsNotNone(heartbeat.updated_at) + self.assertIsNotNone(heartbeat.guest_agent_version) + self.assertEqual("1.2.3", heartbeat.guest_agent_version) + + # retrieve the record + heartbeat_found = AgentHeartBeat.find_by_instance_id( + instance_id=instance_id) + self.assertIsNotNone(heartbeat_found) + self.assertEqual(heartbeat_found.id, heartbeat.id) + self.assertEqual(heartbeat_found.instance_id, heartbeat.instance_id) + self.assertEqual(heartbeat_found.updated_at, heartbeat.updated_at) + self.assertEqual( + heartbeat_found.guest_agent_version, heartbeat.guest_agent_version) + + def test_find_by_instance_id_none(self): + """ + Test to retrieve a guest agents when id is None + """ + heartbeat_found = None + exception_raised = False + try: + heartbeat_found = AgentHeartBeat.find_by_instance_id( + instance_id=None) + except exception.ModelNotFoundError: + exception_raised = True + + self.assertIsNone(heartbeat_found) + self.assertTrue(exception_raised) + + def test_find_by_instance_id_not_found(self): + """ + Test to retrieve a guest agents when id is not found + """ + instance_id = str(uuid.uuid4()) + heartbeat_found = None + exception_raised = False + try: + heartbeat_found = AgentHeartBeat.find_by_instance_id( + instance_id=instance_id) + except exception.ModelNotFoundError: + exception_raised = True + + self.assertIsNone(heartbeat_found) + self.assertTrue(exception_raised) + + def test_find_all_by_version(self): + """ + Test to retrieve all guest agents with a particular version + """ + # create some unique records with the same version + version = str(uuid.uuid4()) + + for x in xrange(5): + instance_id = str(uuid.uuid4()) + heartbeat = AgentHeartBeat.create( + instance_id=instance_id, + guest_agent_version=version, + deleted=0) + self.assertIsNotNone(heartbeat) + + # get all guests by version + heartbeats = AgentHeartBeat.find_all_by_version(version) + self.assertIsNotNone(heartbeats) + self.assertEqual(5, heartbeats.count()) + + def test_find_all_by_version_none(self): + """ + Test to retrieve all guest agents with a None version + """ + heartbeats = None + exception_raised = False + try: + heartbeats = AgentHeartBeat.find_all_by_version(None) + except exception.ModelNotFoundError: + exception_raised = True + + self.assertIsNone(heartbeats) + self.assertTrue(exception_raised) + + def test_find_all_by_version_not_found(self): + """ + Test to retrieve all guest agents with a non-existing version + """ + version = str(uuid.uuid4()) + exception_raised = False + heartbeats = None + try: + heartbeats = AgentHeartBeat.find_all_by_version(version) + except exception.ModelNotFoundError: + exception_raised = True + + self.assertIsNone(heartbeats) + self.assertTrue(exception_raised) + + def test_update_heartbeat(self): + """ + Test to show the upgrade scenario that will be used by conductor + """ + # create a unique record + instance_id = str(uuid.uuid4()) + heartbeat = AgentHeartBeat.create( + instance_id=instance_id, guest_agent_version="1.2.3") + self.assertIsNotNone(heartbeat) + self.assertIsNotNone(heartbeat.id) + self.assertIsNotNone(heartbeat.instance_id) + self.assertEqual(instance_id, heartbeat.instance_id) + self.assertIsNotNone(heartbeat.updated_at) + self.assertIsNotNone(heartbeat.guest_agent_version) + self.assertEqual("1.2.3", heartbeat.guest_agent_version) + + # retrieve the record + heartbeat_found = AgentHeartBeat.find_by_instance_id( + instance_id=instance_id) + self.assertIsNotNone(heartbeat_found) + self.assertEqual(heartbeat_found.id, heartbeat.id) + self.assertEqual(heartbeat_found.instance_id, heartbeat.instance_id) + self.assertEqual(heartbeat_found.updated_at, heartbeat.updated_at) + self.assertEqual( + heartbeat_found.guest_agent_version, heartbeat.guest_agent_version) + + # update + AgentHeartBeat().update(id=heartbeat_found.id, + instance_id=instance_id, + guest_agent_version="1.2.3") + + # retrieve the record + updated_heartbeat = AgentHeartBeat.find_by_instance_id( + instance_id=instance_id) + self.assertIsNotNone(updated_heartbeat) + self.assertEqual(updated_heartbeat.id, heartbeat.id) + self.assertEqual(updated_heartbeat.instance_id, heartbeat.instance_id) + self.assertEqual( + heartbeat_found.guest_agent_version, heartbeat.guest_agent_version) + + self.assertEqual(heartbeat_found.updated_at, heartbeat.updated_at)