From 119f3d6cd47196eabfb655c719a2857fc2408875 Mon Sep 17 00:00:00 2001 From: Ed Cranford Date: Fri, 17 Jan 2014 17:46:32 -0600 Subject: [PATCH] Ignore outdated messages sent to conductor Adds a sent_at field to conductor messages in the manager so that messages that get out of sync on their way to conductor don't cause trouble by updating instances or backups with out-of-date information. Implements: blueprint conductor-ignore-old-messages Change-Id: If06d2f9c9f993100be66f71274240a59b05b66e3 --- trove/conductor/api.py | 12 ++- trove/conductor/manager.py | 86 ++++++++++++++++--- trove/conductor/models.py | 52 +++++++++++ trove/db/sqlalchemy/mappers.py | 2 + trove/db/sqlalchemy/migrate_repo/schema.py | 34 ++++++-- .../versions/021_conductor_last_seen.py | 41 +++++++++ trove/db/sqlalchemy/session.py | 2 + trove/guestagent/backup/backupagent.py | 17 +++- trove/guestagent/common/timeutils.py | 6 ++ trove/guestagent/datastore/service.py | 5 +- .../tests/unittests/conductor/test_methods.py | 63 +++++++++++++- 11 files changed, 287 insertions(+), 33 deletions(-) create mode 100644 trove/conductor/models.py create mode 100644 trove/db/sqlalchemy/migrate_repo/versions/021_conductor_last_seen.py create mode 100644 trove/guestagent/common/timeutils.py diff --git a/trove/conductor/api.py b/trove/conductor/api.py index f4f91bbb6e..10586a85e0 100644 --- a/trove/conductor/api.py +++ b/trove/conductor/api.py @@ -16,6 +16,7 @@ from trove.common import cfg from trove.openstack.common.rpc import proxy from trove.openstack.common import log as logging +from trove.openstack.common.gettextutils import _ CONF = cfg.CONF @@ -34,17 +35,20 @@ class API(proxy.RpcProxy): """Create the routing key for conductor.""" return CONF.conductor_queue - def heartbeat(self, instance_id, payload): - LOG.debug("Making async call to cast heartbeat for instance: %s" + def heartbeat(self, instance_id, payload, sent=None): + LOG.debug(_("Making async call to cast heartbeat for instance: %s") % instance_id) self.cast(self.context, self.make_msg("heartbeat", instance_id=instance_id, + sent=sent, payload=payload)) - def update_backup(self, instance_id, backup_id, **backup_fields): - LOG.debug("Making async call to cast update_backup for instance: %s" + def update_backup(self, instance_id, backup_id, sent=None, + **backup_fields): + LOG.debug(_("Making async call to cast update_backup for instance: %s") % instance_id) self.cast(self.context, self.make_msg("update_backup", instance_id=instance_id, backup_id=backup_id, + sent=sent, **backup_fields)) diff --git a/trove/conductor/manager.py b/trove/conductor/manager.py index 1d4b2271d4..a0c4f6ebc5 100644 --- a/trove/conductor/manager.py +++ b/trove/conductor/manager.py @@ -13,12 +13,15 @@ # under the License. from trove.backup import models as bkup_models +from trove.common import cfg +from trove.common import exception from trove.common.context import TroveContext from trove.common.instance import ServiceStatus +from trove.conductor.models import LastSeen from trove.instance import models as t_models -from trove.openstack.common import periodic_task from trove.openstack.common import log as logging -from trove.common import cfg +from trove.openstack.common import periodic_task +from trove.openstack.common.gettextutils import _ LOG = logging.getLogger(__name__) RPC_API_VERSION = "1.0" @@ -34,35 +37,92 @@ class Manager(periodic_task.PeriodicTasks): auth_token=CONF.nova_proxy_admin_pass, tenant=CONF.nova_proxy_admin_tenant_name) - def heartbeat(self, context, instance_id, payload): - LOG.debug("Instance ID: %s" % str(instance_id)) - LOG.debug("Payload: %s" % str(payload)) + def _message_too_old(self, instance_id, method_name, sent): + fields = { + "instance": instance_id, + "method": method_name, + "sent": sent, + } + LOG.debug(_("Instance %(instance)s sent %(method)s at %(sent)s ") + % fields) + + if sent is None: + LOG.error(_("Sent field not present. Cannot compare.")) + return False + + seen = None + try: + seen = LastSeen.load(instance_id=instance_id, + method_name=method_name) + except exception.NotFound: + # This is fine. + pass + + if seen is None: + LOG.debug(_("Did not find any previous message. Creating.")) + seen = LastSeen.create(instance_id=instance_id, + method_name=method_name, + sent=sent) + seen.save() + return False + + last_sent = float(seen.sent) + if last_sent < sent: + LOG.debug(_("Rec'd message is younger than last seen. Updating.")) + seen.sent = sent + seen.save() + return False + + else: + LOG.error(_("Rec'd message is older than last seen. Discarding.")) + return True + + def heartbeat(self, context, instance_id, payload, sent=None): + LOG.debug(_("Instance ID: %s") % str(instance_id)) + LOG.debug(_("Payload: %s") % str(payload)) status = t_models.InstanceServiceStatus.find_by( instance_id=instance_id) + if self._message_too_old(instance_id, 'heartbeat', sent): + return if payload.get('service_status') is not None: status.set_status(ServiceStatus.from_description( payload['service_status'])) status.save() def update_backup(self, context, instance_id, backup_id, - **backup_fields): - LOG.debug("Instance ID: %s" % str(instance_id)) - LOG.debug("Backup ID: %s" % str(backup_id)) + sent=None, **backup_fields): + LOG.debug(_("Instance ID: %s") % str(instance_id)) + LOG.debug(_("Backup ID: %s") % str(backup_id)) backup = bkup_models.DBBackup.find_by(id=backup_id) # TODO(datsun180b): use context to verify tenant matches + if self._message_too_old(instance_id, 'update_backup', sent): + return + # Some verification based on IDs if backup_id != backup.id: - LOG.error("Backup IDs mismatch! Expected %s, found %s" % - (backup_id, backup.id)) + fields = { + 'expected': backup_id, + 'found': backup.id, + } + LOG.error(_("Backup IDs mismatch! Expected %(expected)s, " + "found %(found)s") % fields) return if instance_id != backup.instance_id: - LOG.error("Backup instance IDs mismatch! Expected %s, found %s" % - (instance_id, backup.instance_id)) + fields = { + 'expected': instance_id, + 'found': backup.instance_id, + } + LOG.error(_("Backup instance IDs mismatch! Expected %(expected)s, " + "found %(found)s") % fields) return for k, v in backup_fields.items(): if hasattr(backup, k): - LOG.debug("Backup %s: %s" % (k, v)) + fields = { + 'key': k, + 'value': v, + } + LOG.debug(_("Backup %(key)s: %(value)s") % fields) setattr(backup, k, v) backup.save() diff --git a/trove/conductor/models.py b/trove/conductor/models.py new file mode 100644 index 0000000000..3f7f799781 --- /dev/null +++ b/trove/conductor/models.py @@ -0,0 +1,52 @@ +#Copyright 2014 Openstack Foundation + +#Licensed under the Apache License, Version 2.0 (the "License"); +#you may not use this file except in compliance with the License. +#You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +#Unless required by applicable law or agreed to in writing, software +#distributed under the License is distributed on an "AS IS" BASIS, +#WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +#See the License for the specific language governing permissions and +#limitations under the License. + +from trove.db import get_db_api +from trove.openstack.common import log as logging + +LOG = logging.getLogger(__name__) + + +def persisted_models(): + return {'conductor_lastseen': LastSeen} + + +class LastSeen(object): + """A table used only by Conductor to discard messages that arrive + late and out of order. + """ + _auto_generated_attrs = [] + _data_fields = ['instance_id', 'method_name', 'sent'] + _table_name = 'conductor_lastseen' + preserve_on_delete = False + + def __init__(self, instance_id, method_name, sent): + self.instance_id = instance_id + self.method_name = method_name + self.sent = sent + + def save(self): + return get_db_api().save(self) + + @classmethod + def load(cls, instance_id, method_name): + seen = get_db_api().find_by(cls, + instance_id=instance_id, + method_name=method_name) + return seen + + @classmethod + def create(cls, instance_id, method_name, sent): + seen = LastSeen(instance_id, method_name, sent) + return seen.save() diff --git a/trove/db/sqlalchemy/mappers.py b/trove/db/sqlalchemy/mappers.py index 97d12ed387..0b260f76a9 100644 --- a/trove/db/sqlalchemy/mappers.py +++ b/trove/db/sqlalchemy/mappers.py @@ -59,6 +59,8 @@ def map(engine, models): Table('configurations', meta, autoload=True)) orm.mapper(models['configuration_parameters'], Table('configuration_parameters', meta, autoload=True)) + orm.mapper(models['conductor_lastseen'], + Table('conductor_lastseen', meta, autoload=True)) def mapping_exists(model): diff --git a/trove/db/sqlalchemy/migrate_repo/schema.py b/trove/db/sqlalchemy/migrate_repo/schema.py index 578ea7effc..defe519874 100644 --- a/trove/db/sqlalchemy/migrate_repo/schema.py +++ b/trove/db/sqlalchemy/migrate_repo/schema.py @@ -24,24 +24,44 @@ import sqlalchemy.types logger = logging.getLogger('trove.db.sqlalchemy.migrate_repo.schema') -String = lambda length: sqlalchemy.types.String(length=length) +class String(sqlalchemy.types.String): + def __init__(self, length, *args, **kwargs): + super(String, self).__init__(*args, length=length, **kwargs) -Text = lambda: sqlalchemy.types.Text(length=None) +class Text(sqlalchemy.types.Text): + def __init__(self, length=None, *args, **kwargs): + super(Text, self).__init__(*args, length=length, **kwargs) -Boolean = lambda: sqlalchemy.types.Boolean(create_constraint=True, name=None) +class Boolean(sqlalchemy.types.Boolean): + def __init__(self, create_constraint=True, name=None, *args, **kwargs): + super(Boolean, self).__init__(*args, + create_constraint=create_constraint, + name=name, + **kwargs) -DateTime = lambda: sqlalchemy.types.DateTime(timezone=False) +class DateTime(sqlalchemy.types.DateTime): + def __init__(self, timezone=False, *args, **kwargs): + super(DateTime, self).__init__(*args, + timezone=timezone, + **kwargs) -Integer = lambda: sqlalchemy.types.Integer() +class Integer(sqlalchemy.types.Integer): + def __init__(self, *args, **kwargs): + super(Integer, self).__init__(*args, **kwargs) -BigInteger = lambda: sqlalchemy.types.BigInteger() +class BigInteger(sqlalchemy.types.BigInteger): + def __init__(self, *args, **kwargs): + super(BigInteger, self).__init__(*args, **kwargs) -Float = lambda: sqlalchemy.types.Float() + +class Float(sqlalchemy.types.Float): + def __init__(self, *args, **kwargs): + super(Float, self).__init__(*args, **kwargs) def create_tables(tables): diff --git a/trove/db/sqlalchemy/migrate_repo/versions/021_conductor_last_seen.py b/trove/db/sqlalchemy/migrate_repo/versions/021_conductor_last_seen.py new file mode 100644 index 0000000000..d93b847f1f --- /dev/null +++ b/trove/db/sqlalchemy/migrate_repo/versions/021_conductor_last_seen.py @@ -0,0 +1,41 @@ +# Copyright 2012 OpenStack Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from sqlalchemy.schema import Column +from sqlalchemy.schema import MetaData + +from trove.db.sqlalchemy.migrate_repo.schema import Float +from trove.db.sqlalchemy.migrate_repo.schema import String +from trove.db.sqlalchemy.migrate_repo.schema import Table +from trove.db.sqlalchemy.migrate_repo.schema import create_tables +from trove.db.sqlalchemy.migrate_repo.schema import drop_tables + +meta = MetaData() + +conductor_lastseen = Table( + 'conductor_lastseen', + meta, + Column('instance_id', String(36), primary_key=True, nullable=False), + Column('method_name', String(36), primary_key=True, nullable=False), + Column('sent', Float(precision=32))) + + +def upgrade(migrate_engine): + meta.bind = migrate_engine + create_tables([conductor_lastseen]) + + +def downgrade(migrate_engine): + meta.bind = migrate_engine + drop_tables([conductor_lastseen]) diff --git a/trove/db/sqlalchemy/session.py b/trove/db/sqlalchemy/session.py index 93e0e832b6..7bf7375679 100644 --- a/trove/db/sqlalchemy/session.py +++ b/trove/db/sqlalchemy/session.py @@ -50,6 +50,7 @@ def configure_db(options, models_mapper=None): from trove.backup import models as backup_models from trove.extensions.security_group import models as secgrp_models from trove.configuration import models as configurations_models + from trove.conductor import models as conductor_models model_modules = [ base_models, @@ -61,6 +62,7 @@ def configure_db(options, models_mapper=None): backup_models, secgrp_models, configurations_models, + conductor_models, ] models = {} diff --git a/trove/guestagent/backup/backupagent.py b/trove/guestagent/backup/backupagent.py index 7420f03782..7f0f1dcf55 100644 --- a/trove/guestagent/backup/backupagent.py +++ b/trove/guestagent/backup/backupagent.py @@ -19,6 +19,7 @@ from trove.backup.models import BackupState from trove.common import cfg from trove.common import context as trove_context from trove.conductor import api as conductor_api +from trove.guestagent.common import timeutils from trove.guestagent.dbaas import get_filesystem_volume_stats from trove.guestagent.datastore.mysql.service import ADMIN_USER_NAME from trove.guestagent.datastore.mysql.service import get_auth_password @@ -68,7 +69,9 @@ class BackupAgent(object): 'size': stats.get('used', 0.0), 'state': BackupState.BUILDING, } - conductor.update_backup(CONF.guest_id, **backup) + conductor.update_backup(CONF.guest_id, + sent=timeutils.float_utcnow(), + **backup) try: with runner(filename=backup_id, extra_opts=extra_opts, @@ -99,18 +102,24 @@ class BackupAgent(object): except Exception: LOG.exception("Error saving %s Backup", backup_id) backup.update({'state': BackupState.FAILED}) - conductor.update_backup(CONF.guest_id, **backup) + conductor.update_backup(CONF.guest_id, + sent=timeutils.float_utcnow(), + **backup) raise except Exception: LOG.exception("Error running backup: %s", backup_id) backup.update({'state': BackupState.FAILED}) - conductor.update_backup(CONF.guest_id, **backup) + conductor.update_backup(CONF.guest_id, + sent=timeutils.float_utcnow(), + **backup) raise else: LOG.info("Saving %s Backup Info to model", backup_id) backup.update({'state': BackupState.COMPLETED}) - conductor.update_backup(CONF.guest_id, **backup) + conductor.update_backup(CONF.guest_id, + sent=timeutils.float_utcnow(), + **backup) def execute_restore(self, context, backup_info, restore_location): diff --git a/trove/guestagent/common/timeutils.py b/trove/guestagent/common/timeutils.py new file mode 100644 index 0000000000..09fcc88572 --- /dev/null +++ b/trove/guestagent/common/timeutils.py @@ -0,0 +1,6 @@ +from trove.openstack.common import timeutils +from datetime import datetime + + +def float_utcnow(): + return float(datetime.strftime(timeutils.utcnow(), "%s.%f")) diff --git a/trove/guestagent/datastore/service.py b/trove/guestagent/datastore/service.py index 7d9dbb3a71..76bb1e3691 100644 --- a/trove/guestagent/datastore/service.py +++ b/trove/guestagent/datastore/service.py @@ -22,6 +22,7 @@ from trove.common import cfg from trove.common import context from trove.common import instance as rd_instance from trove.conductor import api as conductor_api +from trove.guestagent.common import timeutils from trove.instance import models as rd_models from trove.openstack.common import log as logging @@ -114,7 +115,9 @@ class BaseDbStatus(object): heartbeat = { 'service_status': status.description, } - conductor_api.API(ctxt).heartbeat(CONF.guest_id, heartbeat) + conductor_api.API(ctxt).heartbeat(CONF.guest_id, + heartbeat, + sent=timeutils.float_utcnow()) LOG.debug("Successfully cast set_status.") self.status = status diff --git a/trove/tests/unittests/conductor/test_methods.py b/trove/tests/unittests/conductor/test_methods.py index 924e3cd080..1113633841 100644 --- a/trove/tests/unittests/conductor/test_methods.py +++ b/trove/tests/unittests/conductor/test_methods.py @@ -16,9 +16,10 @@ import testtools from mockito import unstub from trove.backup import models as bkup_models from trove.common import exception as t_exception -from trove.common import instance as t_instance from trove.common import utils +from trove.common.instance import ServiceStatuses from trove.conductor import manager as conductor_manager +from trove.guestagent.common import timeutils from trove.instance import models as t_models from trove.tests.unittests.util import util @@ -45,7 +46,7 @@ class ConductorMethodTests(testtools.TestCase): iss = t_models.InstanceServiceStatus( id=new_id, instance_id=self.instance_id, - status=t_instance.ServiceStatuses.NEW) + status=ServiceStatuses.NEW) iss.save() return new_id @@ -99,10 +100,10 @@ class ConductorMethodTests(testtools.TestCase): def test_heartbeat_instance_status_changed(self): iss_id = self._create_iss() - payload = {'service_status': 'building'} + payload = {'service_status': ServiceStatuses.BUILDING.description} self.cond_mgr.heartbeat(None, self.instance_id, payload) iss = self._get_iss(iss_id) - self.assertEqual(t_instance.ServiceStatuses.BUILDING, iss.status) + self.assertEqual(ServiceStatuses.BUILDING, iss.status) # --- Tests for update_backup --- @@ -135,3 +136,57 @@ class ConductorMethodTests(testtools.TestCase): name=new_name) bkup = self._get_backup(bkup_id) self.assertEqual(new_name, bkup.name) + + # --- Tests for discarding old messages --- + + def test_heartbeat_newer_timestamp_accepted(self): + new_p = {'service_status': ServiceStatuses.NEW.description} + build_p = {'service_status': ServiceStatuses.BUILDING.description} + iss_id = self._create_iss() + iss = self._get_iss(iss_id) + now = timeutils.float_utcnow() + future = now + 60 + self.cond_mgr.heartbeat(None, self.instance_id, new_p, sent=now) + self.cond_mgr.heartbeat(None, self.instance_id, build_p, sent=future) + iss = self._get_iss(iss_id) + self.assertEqual(ServiceStatuses.BUILDING, iss.status) + + def test_heartbeat_older_timestamp_discarded(self): + new_p = {'service_status': ServiceStatuses.NEW.description} + build_p = {'service_status': ServiceStatuses.BUILDING.description} + iss_id = self._create_iss() + iss = self._get_iss(iss_id) + now = timeutils.float_utcnow() + past = now - 60 + self.cond_mgr.heartbeat(None, self.instance_id, new_p, sent=past) + self.cond_mgr.heartbeat(None, self.instance_id, build_p, sent=past) + iss = self._get_iss(iss_id) + self.assertEqual(ServiceStatuses.NEW, iss.status) + + def test_backup_newer_timestamp_accepted(self): + old_name = "oldname" + new_name = "renamed" + bkup_id = self._create_backup(old_name) + bkup = self._get_backup(bkup_id) + now = timeutils.float_utcnow() + future = now + 60 + self.cond_mgr.update_backup(None, self.instance_id, bkup_id, + sent=now, name=old_name) + self.cond_mgr.update_backup(None, self.instance_id, bkup_id, + sent=future, name=new_name) + bkup = self._get_backup(bkup_id) + self.assertEqual(new_name, bkup.name) + + def test_backup_older_timestamp_discarded(self): + old_name = "oldname" + new_name = "renamed" + bkup_id = self._create_backup(old_name) + bkup = self._get_backup(bkup_id) + now = timeutils.float_utcnow() + past = now - 60 + self.cond_mgr.update_backup(None, self.instance_id, bkup_id, + sent=now, name=old_name) + self.cond_mgr.update_backup(None, self.instance_id, bkup_id, + sent=past, name=new_name) + bkup = self._get_backup(bkup_id) + self.assertEqual(old_name, bkup.name)