Add a new column and indexes to agent_heartbeats

Includes a new index on instance_id
Added a deleted and deleted_at column
Includes an index for deleted too
Includes in call to find all by version
Includes call in _delete_resources()
Fixed AgentHeartBeat delete call
Corrected downgrade script as per SlickNik
Needs the save method because of how we are calling create
Reworked migration to create table from scratch
Renamed migration from 026 --> 027
Exception handling, added not null to updated_at col
Renamed migration from 027 --> 028

Change-Id: Ie0568850856c32fdbf9495c496339730d6b5beef
Implements: blueprint upgrade-guestagent
This commit is contained in:
daniel-a-nguyen 2014-03-19 18:14:53 -07:00 committed by daniel-a-nguyen
parent 22a88859e4
commit 7b1fe23039
5 changed files with 330 additions and 1 deletions

View File

@ -0,0 +1,81 @@
# Copyright 2014 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from sqlalchemy.exc import OperationalError
from sqlalchemy.schema import Column
from sqlalchemy.schema import MetaData
from trove.db.sqlalchemy.migrate_repo.schema import Boolean
from trove.db.sqlalchemy.migrate_repo.schema import create_tables
from trove.db.sqlalchemy.migrate_repo.schema import DateTime
from trove.db.sqlalchemy.migrate_repo.schema import drop_tables
from trove.db.sqlalchemy.migrate_repo.schema import String
from trove.db.sqlalchemy.migrate_repo.schema import Table
from trove.openstack.common import log as logging
logger = logging.getLogger('trove.db.sqlalchemy.migrate_repo.schema')
def upgrade(migrate_engine):
meta = MetaData()
meta.bind = migrate_engine
# new table with desired columns, indexes, and constraints
new_agent_heartbeats = Table(
'agent_heartbeats', meta,
Column('id', String(36), primary_key=True, nullable=False),
Column('instance_id', String(36),
nullable=False, unique=True, index=True),
Column('guest_agent_version', String(255), index=True),
Column('deleted', Boolean(), index=True),
Column('deleted_at', DateTime()),
Column('updated_at', DateTime(), nullable=False))
# original table from migration 005_heartbeat.py
previous_agent_heartbeats = Table('agent_heartbeats', meta, autoload=True)
try:
drop_tables([previous_agent_heartbeats])
except OperationalError as e:
logger.warn("This table may have been dropped by some other means.")
logger.warn(e)
create_tables([new_agent_heartbeats])
def downgrade(migrate_engine):
meta = MetaData()
meta.bind = migrate_engine
# new table with desired columns, indexes, and constraints
new_agent_heartbeats = Table('agent_heartbeats', meta, autoload=True)
try:
drop_tables([new_agent_heartbeats])
except OperationalError as e:
logger.warn("This table may have been dropped by some other means.")
logger.warn(e)
# reset the migrate_engine
meta = MetaData()
meta.bind = migrate_engine
# original table from migration 005_heartbeat.py
previous_agent_heartbeats = Table(
'agent_heartbeats', meta, Column('id', String(36), primary_key=True,
nullable=False),
Column('instance_id', String(36), nullable=False),
Column('updated_at', DateTime()), extend_existing=True)
create_tables([previous_agent_heartbeats])

View File

@ -36,7 +36,8 @@ def persisted_models():
class AgentHeartBeat(dbmodels.DatabaseModelBase):
"""Defines the state of a Guest Agent."""
_data_fields = ['instance_id', 'updated_at']
_data_fields = ['instance_id', 'updated_at', 'guest_agent_version',
'deleted', 'deleted_at']
_table_name = 'agent_heartbeats'
def __init__(self, **kwargs):
@ -58,6 +59,32 @@ class AgentHeartBeat(dbmodels.DatabaseModelBase):
{'name': self.__class__.__name__, 'dict': self.__dict__})
return get_db_api().save(self)
@classmethod
def find_all_by_version(cls, guest_agent_version, deleted=0):
if guest_agent_version is None:
raise exception.ModelNotFoundError()
heartbeats = cls.find_all(guest_agent_version=guest_agent_version,
deleted=deleted)
if heartbeats is None or heartbeats.count() == 0:
raise exception.ModelNotFoundError(
guest_agent_version=guest_agent_version)
return heartbeats
@classmethod
def find_by_instance_id(cls, instance_id):
if instance_id is None:
raise exception.ModelNotFoundError(instance_id=instance_id)
try:
return cls.find_by(instance_id=instance_id)
except exception.NotFound as e:
LOG.error(e.message)
raise exception.ModelNotFoundError(instance_id=instance_id)
@staticmethod
def is_active(agent):
return (datetime.now() - agent.updated_at <

View File

@ -514,6 +514,7 @@ class BaseInstance(SimpleInstance):
_delete_resources)
def _delete_resources(self, deleted_at):
"""Implemented in subclass."""
pass
def delete_async(self):

View File

@ -20,6 +20,8 @@ Routes all the requests to the task manager.
from trove.common import cfg
from trove.common import exception
from trove.guestagent import models as agent_models
from trove.openstack.common.rpc import proxy
from trove.openstack.common import log as logging
@ -51,6 +53,14 @@ class API(proxy.RpcProxy):
"""Create the routing key for the taskmanager"""
return CONF.taskmanager_queue
def _delete_heartbeat(self, instance_id):
agent_heart_beat = agent_models.AgentHeartBeat()
try:
heartbeat = agent_heart_beat.find_by_instance_id(instance_id)
heartbeat.delete()
except exception.ModelNotFoundError as e:
LOG.error(e.message)
def resize_volume(self, new_size, instance_id):
LOG.debug("Making async call to resize volume for instance: %s"
% instance_id)
@ -86,6 +96,7 @@ class API(proxy.RpcProxy):
LOG.debug("Making async call to delete instance: %s" % instance_id)
self.cast(self.context,
self.make_msg("delete_instance", instance_id=instance_id))
self._delete_heartbeat(instance_id)
def create_backup(self, backup_info, instance_id):
LOG.debug("Making async call to create a backup for instance: %s" %

View File

@ -0,0 +1,209 @@
# Copyright 2014 Hewlett-Packard Development Company, L.P.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import testtools
import uuid
from trove.common import exception
from trove.guestagent.models import AgentHeartBeat
from trove.tests.unittests.util import util
class AgentHeartBeatTest(testtools.TestCase):
def setUp(self):
super(AgentHeartBeatTest, self).setUp()
util.init_db()
def tearDown(self):
super(AgentHeartBeatTest, self).tearDown()
def test_create(self):
"""
Test the creation of a new agent heartbeat record
"""
instance_id = str(uuid.uuid4())
heartbeat = AgentHeartBeat.create(
instance_id=instance_id)
self.assertIsNotNone(heartbeat)
self.assertIsNotNone(heartbeat.id)
self.assertIsNotNone(heartbeat.instance_id)
self.assertEqual(instance_id,
heartbeat.instance_id)
self.assertIsNotNone(heartbeat.updated_at)
self.assertIsNone(heartbeat.guest_agent_version)
def test_create_with_version(self):
"""
Test the creation of a new agent heartbeat record w/ guest version
"""
instance_id = str(uuid.uuid4())
heartbeat = AgentHeartBeat.create(
instance_id=instance_id,
guest_agent_version="1.2.3")
self.assertIsNotNone(heartbeat)
self.assertIsNotNone(heartbeat.id)
self.assertIsNotNone(heartbeat.instance_id)
self.assertEqual(instance_id,
heartbeat.instance_id)
self.assertIsNotNone(heartbeat.updated_at)
self.assertIsNotNone(heartbeat.guest_agent_version)
self.assertEqual("1.2.3", heartbeat.guest_agent_version)
def test_find_by_instance_id(self):
"""
Test to retrieve a guest agents by it's id
"""
# create a unique record
instance_id = str(uuid.uuid4())
heartbeat = AgentHeartBeat.create(
instance_id=instance_id, guest_agent_version="1.2.3")
self.assertIsNotNone(heartbeat)
self.assertIsNotNone(heartbeat.id)
self.assertIsNotNone(heartbeat.instance_id)
self.assertEqual(instance_id, heartbeat.instance_id)
self.assertIsNotNone(heartbeat.updated_at)
self.assertIsNotNone(heartbeat.guest_agent_version)
self.assertEqual("1.2.3", heartbeat.guest_agent_version)
# retrieve the record
heartbeat_found = AgentHeartBeat.find_by_instance_id(
instance_id=instance_id)
self.assertIsNotNone(heartbeat_found)
self.assertEqual(heartbeat_found.id, heartbeat.id)
self.assertEqual(heartbeat_found.instance_id, heartbeat.instance_id)
self.assertEqual(heartbeat_found.updated_at, heartbeat.updated_at)
self.assertEqual(
heartbeat_found.guest_agent_version, heartbeat.guest_agent_version)
def test_find_by_instance_id_none(self):
"""
Test to retrieve a guest agents when id is None
"""
heartbeat_found = None
exception_raised = False
try:
heartbeat_found = AgentHeartBeat.find_by_instance_id(
instance_id=None)
except exception.ModelNotFoundError:
exception_raised = True
self.assertIsNone(heartbeat_found)
self.assertTrue(exception_raised)
def test_find_by_instance_id_not_found(self):
"""
Test to retrieve a guest agents when id is not found
"""
instance_id = str(uuid.uuid4())
heartbeat_found = None
exception_raised = False
try:
heartbeat_found = AgentHeartBeat.find_by_instance_id(
instance_id=instance_id)
except exception.ModelNotFoundError:
exception_raised = True
self.assertIsNone(heartbeat_found)
self.assertTrue(exception_raised)
def test_find_all_by_version(self):
"""
Test to retrieve all guest agents with a particular version
"""
# create some unique records with the same version
version = str(uuid.uuid4())
for x in xrange(5):
instance_id = str(uuid.uuid4())
heartbeat = AgentHeartBeat.create(
instance_id=instance_id,
guest_agent_version=version,
deleted=0)
self.assertIsNotNone(heartbeat)
# get all guests by version
heartbeats = AgentHeartBeat.find_all_by_version(version)
self.assertIsNotNone(heartbeats)
self.assertEqual(5, heartbeats.count())
def test_find_all_by_version_none(self):
"""
Test to retrieve all guest agents with a None version
"""
heartbeats = None
exception_raised = False
try:
heartbeats = AgentHeartBeat.find_all_by_version(None)
except exception.ModelNotFoundError:
exception_raised = True
self.assertIsNone(heartbeats)
self.assertTrue(exception_raised)
def test_find_all_by_version_not_found(self):
"""
Test to retrieve all guest agents with a non-existing version
"""
version = str(uuid.uuid4())
exception_raised = False
heartbeats = None
try:
heartbeats = AgentHeartBeat.find_all_by_version(version)
except exception.ModelNotFoundError:
exception_raised = True
self.assertIsNone(heartbeats)
self.assertTrue(exception_raised)
def test_update_heartbeat(self):
"""
Test to show the upgrade scenario that will be used by conductor
"""
# create a unique record
instance_id = str(uuid.uuid4())
heartbeat = AgentHeartBeat.create(
instance_id=instance_id, guest_agent_version="1.2.3")
self.assertIsNotNone(heartbeat)
self.assertIsNotNone(heartbeat.id)
self.assertIsNotNone(heartbeat.instance_id)
self.assertEqual(instance_id, heartbeat.instance_id)
self.assertIsNotNone(heartbeat.updated_at)
self.assertIsNotNone(heartbeat.guest_agent_version)
self.assertEqual("1.2.3", heartbeat.guest_agent_version)
# retrieve the record
heartbeat_found = AgentHeartBeat.find_by_instance_id(
instance_id=instance_id)
self.assertIsNotNone(heartbeat_found)
self.assertEqual(heartbeat_found.id, heartbeat.id)
self.assertEqual(heartbeat_found.instance_id, heartbeat.instance_id)
self.assertEqual(heartbeat_found.updated_at, heartbeat.updated_at)
self.assertEqual(
heartbeat_found.guest_agent_version, heartbeat.guest_agent_version)
# update
AgentHeartBeat().update(id=heartbeat_found.id,
instance_id=instance_id,
guest_agent_version="1.2.3")
# retrieve the record
updated_heartbeat = AgentHeartBeat.find_by_instance_id(
instance_id=instance_id)
self.assertIsNotNone(updated_heartbeat)
self.assertEqual(updated_heartbeat.id, heartbeat.id)
self.assertEqual(updated_heartbeat.instance_id, heartbeat.instance_id)
self.assertEqual(
heartbeat_found.guest_agent_version, heartbeat.guest_agent_version)
self.assertEqual(heartbeat_found.updated_at, heartbeat.updated_at)