Ignore outdated messages sent to conductor

Adds a sent_at field to conductor messages in the manager so that
messages that get out of sync on their way to conductor don't cause
trouble by updating instances or backups with out-of-date information.

Implements: blueprint conductor-ignore-old-messages
Change-Id: If06d2f9c9f993100be66f71274240a59b05b66e3
This commit is contained in:
Ed Cranford 2014-01-17 17:46:32 -06:00
parent 1f3a4fda87
commit 119f3d6cd4
11 changed files with 287 additions and 33 deletions

View File

@ -16,6 +16,7 @@
from trove.common import cfg
from trove.openstack.common.rpc import proxy
from trove.openstack.common import log as logging
from trove.openstack.common.gettextutils import _
CONF = cfg.CONF
@ -34,17 +35,20 @@ class API(proxy.RpcProxy):
"""Create the routing key for conductor."""
return CONF.conductor_queue
def heartbeat(self, instance_id, payload):
LOG.debug("Making async call to cast heartbeat for instance: %s"
def heartbeat(self, instance_id, payload, sent=None):
LOG.debug(_("Making async call to cast heartbeat for instance: %s")
% instance_id)
self.cast(self.context, self.make_msg("heartbeat",
instance_id=instance_id,
sent=sent,
payload=payload))
def update_backup(self, instance_id, backup_id, **backup_fields):
LOG.debug("Making async call to cast update_backup for instance: %s"
def update_backup(self, instance_id, backup_id, sent=None,
**backup_fields):
LOG.debug(_("Making async call to cast update_backup for instance: %s")
% instance_id)
self.cast(self.context, self.make_msg("update_backup",
instance_id=instance_id,
backup_id=backup_id,
sent=sent,
**backup_fields))

View File

@ -13,12 +13,15 @@
# under the License.
from trove.backup import models as bkup_models
from trove.common import cfg
from trove.common import exception
from trove.common.context import TroveContext
from trove.common.instance import ServiceStatus
from trove.conductor.models import LastSeen
from trove.instance import models as t_models
from trove.openstack.common import periodic_task
from trove.openstack.common import log as logging
from trove.common import cfg
from trove.openstack.common import periodic_task
from trove.openstack.common.gettextutils import _
LOG = logging.getLogger(__name__)
RPC_API_VERSION = "1.0"
@ -34,35 +37,92 @@ class Manager(periodic_task.PeriodicTasks):
auth_token=CONF.nova_proxy_admin_pass,
tenant=CONF.nova_proxy_admin_tenant_name)
def heartbeat(self, context, instance_id, payload):
LOG.debug("Instance ID: %s" % str(instance_id))
LOG.debug("Payload: %s" % str(payload))
def _message_too_old(self, instance_id, method_name, sent):
fields = {
"instance": instance_id,
"method": method_name,
"sent": sent,
}
LOG.debug(_("Instance %(instance)s sent %(method)s at %(sent)s ")
% fields)
if sent is None:
LOG.error(_("Sent field not present. Cannot compare."))
return False
seen = None
try:
seen = LastSeen.load(instance_id=instance_id,
method_name=method_name)
except exception.NotFound:
# This is fine.
pass
if seen is None:
LOG.debug(_("Did not find any previous message. Creating."))
seen = LastSeen.create(instance_id=instance_id,
method_name=method_name,
sent=sent)
seen.save()
return False
last_sent = float(seen.sent)
if last_sent < sent:
LOG.debug(_("Rec'd message is younger than last seen. Updating."))
seen.sent = sent
seen.save()
return False
else:
LOG.error(_("Rec'd message is older than last seen. Discarding."))
return True
def heartbeat(self, context, instance_id, payload, sent=None):
LOG.debug(_("Instance ID: %s") % str(instance_id))
LOG.debug(_("Payload: %s") % str(payload))
status = t_models.InstanceServiceStatus.find_by(
instance_id=instance_id)
if self._message_too_old(instance_id, 'heartbeat', sent):
return
if payload.get('service_status') is not None:
status.set_status(ServiceStatus.from_description(
payload['service_status']))
status.save()
def update_backup(self, context, instance_id, backup_id,
**backup_fields):
LOG.debug("Instance ID: %s" % str(instance_id))
LOG.debug("Backup ID: %s" % str(backup_id))
sent=None, **backup_fields):
LOG.debug(_("Instance ID: %s") % str(instance_id))
LOG.debug(_("Backup ID: %s") % str(backup_id))
backup = bkup_models.DBBackup.find_by(id=backup_id)
# TODO(datsun180b): use context to verify tenant matches
if self._message_too_old(instance_id, 'update_backup', sent):
return
# Some verification based on IDs
if backup_id != backup.id:
LOG.error("Backup IDs mismatch! Expected %s, found %s" %
(backup_id, backup.id))
fields = {
'expected': backup_id,
'found': backup.id,
}
LOG.error(_("Backup IDs mismatch! Expected %(expected)s, "
"found %(found)s") % fields)
return
if instance_id != backup.instance_id:
LOG.error("Backup instance IDs mismatch! Expected %s, found %s" %
(instance_id, backup.instance_id))
fields = {
'expected': instance_id,
'found': backup.instance_id,
}
LOG.error(_("Backup instance IDs mismatch! Expected %(expected)s, "
"found %(found)s") % fields)
return
for k, v in backup_fields.items():
if hasattr(backup, k):
LOG.debug("Backup %s: %s" % (k, v))
fields = {
'key': k,
'value': v,
}
LOG.debug(_("Backup %(key)s: %(value)s") % fields)
setattr(backup, k, v)
backup.save()

52
trove/conductor/models.py Normal file
View File

@ -0,0 +1,52 @@
#Copyright 2014 Openstack Foundation
#Licensed under the Apache License, Version 2.0 (the "License");
#you may not use this file except in compliance with the License.
#You may obtain a copy of the License at
#
#http://www.apache.org/licenses/LICENSE-2.0
#
#Unless required by applicable law or agreed to in writing, software
#distributed under the License is distributed on an "AS IS" BASIS,
#WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#See the License for the specific language governing permissions and
#limitations under the License.
from trove.db import get_db_api
from trove.openstack.common import log as logging
LOG = logging.getLogger(__name__)
def persisted_models():
return {'conductor_lastseen': LastSeen}
class LastSeen(object):
"""A table used only by Conductor to discard messages that arrive
late and out of order.
"""
_auto_generated_attrs = []
_data_fields = ['instance_id', 'method_name', 'sent']
_table_name = 'conductor_lastseen'
preserve_on_delete = False
def __init__(self, instance_id, method_name, sent):
self.instance_id = instance_id
self.method_name = method_name
self.sent = sent
def save(self):
return get_db_api().save(self)
@classmethod
def load(cls, instance_id, method_name):
seen = get_db_api().find_by(cls,
instance_id=instance_id,
method_name=method_name)
return seen
@classmethod
def create(cls, instance_id, method_name, sent):
seen = LastSeen(instance_id, method_name, sent)
return seen.save()

View File

@ -59,6 +59,8 @@ def map(engine, models):
Table('configurations', meta, autoload=True))
orm.mapper(models['configuration_parameters'],
Table('configuration_parameters', meta, autoload=True))
orm.mapper(models['conductor_lastseen'],
Table('conductor_lastseen', meta, autoload=True))
def mapping_exists(model):

View File

@ -24,24 +24,44 @@ import sqlalchemy.types
logger = logging.getLogger('trove.db.sqlalchemy.migrate_repo.schema')
String = lambda length: sqlalchemy.types.String(length=length)
class String(sqlalchemy.types.String):
def __init__(self, length, *args, **kwargs):
super(String, self).__init__(*args, length=length, **kwargs)
Text = lambda: sqlalchemy.types.Text(length=None)
class Text(sqlalchemy.types.Text):
def __init__(self, length=None, *args, **kwargs):
super(Text, self).__init__(*args, length=length, **kwargs)
Boolean = lambda: sqlalchemy.types.Boolean(create_constraint=True, name=None)
class Boolean(sqlalchemy.types.Boolean):
def __init__(self, create_constraint=True, name=None, *args, **kwargs):
super(Boolean, self).__init__(*args,
create_constraint=create_constraint,
name=name,
**kwargs)
DateTime = lambda: sqlalchemy.types.DateTime(timezone=False)
class DateTime(sqlalchemy.types.DateTime):
def __init__(self, timezone=False, *args, **kwargs):
super(DateTime, self).__init__(*args,
timezone=timezone,
**kwargs)
Integer = lambda: sqlalchemy.types.Integer()
class Integer(sqlalchemy.types.Integer):
def __init__(self, *args, **kwargs):
super(Integer, self).__init__(*args, **kwargs)
BigInteger = lambda: sqlalchemy.types.BigInteger()
class BigInteger(sqlalchemy.types.BigInteger):
def __init__(self, *args, **kwargs):
super(BigInteger, self).__init__(*args, **kwargs)
Float = lambda: sqlalchemy.types.Float()
class Float(sqlalchemy.types.Float):
def __init__(self, *args, **kwargs):
super(Float, self).__init__(*args, **kwargs)
def create_tables(tables):

View File

@ -0,0 +1,41 @@
# Copyright 2012 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from sqlalchemy.schema import Column
from sqlalchemy.schema import MetaData
from trove.db.sqlalchemy.migrate_repo.schema import Float
from trove.db.sqlalchemy.migrate_repo.schema import String
from trove.db.sqlalchemy.migrate_repo.schema import Table
from trove.db.sqlalchemy.migrate_repo.schema import create_tables
from trove.db.sqlalchemy.migrate_repo.schema import drop_tables
meta = MetaData()
conductor_lastseen = Table(
'conductor_lastseen',
meta,
Column('instance_id', String(36), primary_key=True, nullable=False),
Column('method_name', String(36), primary_key=True, nullable=False),
Column('sent', Float(precision=32)))
def upgrade(migrate_engine):
meta.bind = migrate_engine
create_tables([conductor_lastseen])
def downgrade(migrate_engine):
meta.bind = migrate_engine
drop_tables([conductor_lastseen])

View File

@ -50,6 +50,7 @@ def configure_db(options, models_mapper=None):
from trove.backup import models as backup_models
from trove.extensions.security_group import models as secgrp_models
from trove.configuration import models as configurations_models
from trove.conductor import models as conductor_models
model_modules = [
base_models,
@ -61,6 +62,7 @@ def configure_db(options, models_mapper=None):
backup_models,
secgrp_models,
configurations_models,
conductor_models,
]
models = {}

View File

@ -19,6 +19,7 @@ from trove.backup.models import BackupState
from trove.common import cfg
from trove.common import context as trove_context
from trove.conductor import api as conductor_api
from trove.guestagent.common import timeutils
from trove.guestagent.dbaas import get_filesystem_volume_stats
from trove.guestagent.datastore.mysql.service import ADMIN_USER_NAME
from trove.guestagent.datastore.mysql.service import get_auth_password
@ -68,7 +69,9 @@ class BackupAgent(object):
'size': stats.get('used', 0.0),
'state': BackupState.BUILDING,
}
conductor.update_backup(CONF.guest_id, **backup)
conductor.update_backup(CONF.guest_id,
sent=timeutils.float_utcnow(),
**backup)
try:
with runner(filename=backup_id, extra_opts=extra_opts,
@ -99,18 +102,24 @@ class BackupAgent(object):
except Exception:
LOG.exception("Error saving %s Backup", backup_id)
backup.update({'state': BackupState.FAILED})
conductor.update_backup(CONF.guest_id, **backup)
conductor.update_backup(CONF.guest_id,
sent=timeutils.float_utcnow(),
**backup)
raise
except Exception:
LOG.exception("Error running backup: %s", backup_id)
backup.update({'state': BackupState.FAILED})
conductor.update_backup(CONF.guest_id, **backup)
conductor.update_backup(CONF.guest_id,
sent=timeutils.float_utcnow(),
**backup)
raise
else:
LOG.info("Saving %s Backup Info to model", backup_id)
backup.update({'state': BackupState.COMPLETED})
conductor.update_backup(CONF.guest_id, **backup)
conductor.update_backup(CONF.guest_id,
sent=timeutils.float_utcnow(),
**backup)
def execute_restore(self, context, backup_info, restore_location):

View File

@ -0,0 +1,6 @@
from trove.openstack.common import timeutils
from datetime import datetime
def float_utcnow():
return float(datetime.strftime(timeutils.utcnow(), "%s.%f"))

View File

@ -22,6 +22,7 @@ from trove.common import cfg
from trove.common import context
from trove.common import instance as rd_instance
from trove.conductor import api as conductor_api
from trove.guestagent.common import timeutils
from trove.instance import models as rd_models
from trove.openstack.common import log as logging
@ -114,7 +115,9 @@ class BaseDbStatus(object):
heartbeat = {
'service_status': status.description,
}
conductor_api.API(ctxt).heartbeat(CONF.guest_id, heartbeat)
conductor_api.API(ctxt).heartbeat(CONF.guest_id,
heartbeat,
sent=timeutils.float_utcnow())
LOG.debug("Successfully cast set_status.")
self.status = status

View File

@ -16,9 +16,10 @@ import testtools
from mockito import unstub
from trove.backup import models as bkup_models
from trove.common import exception as t_exception
from trove.common import instance as t_instance
from trove.common import utils
from trove.common.instance import ServiceStatuses
from trove.conductor import manager as conductor_manager
from trove.guestagent.common import timeutils
from trove.instance import models as t_models
from trove.tests.unittests.util import util
@ -45,7 +46,7 @@ class ConductorMethodTests(testtools.TestCase):
iss = t_models.InstanceServiceStatus(
id=new_id,
instance_id=self.instance_id,
status=t_instance.ServiceStatuses.NEW)
status=ServiceStatuses.NEW)
iss.save()
return new_id
@ -99,10 +100,10 @@ class ConductorMethodTests(testtools.TestCase):
def test_heartbeat_instance_status_changed(self):
iss_id = self._create_iss()
payload = {'service_status': 'building'}
payload = {'service_status': ServiceStatuses.BUILDING.description}
self.cond_mgr.heartbeat(None, self.instance_id, payload)
iss = self._get_iss(iss_id)
self.assertEqual(t_instance.ServiceStatuses.BUILDING, iss.status)
self.assertEqual(ServiceStatuses.BUILDING, iss.status)
# --- Tests for update_backup ---
@ -135,3 +136,57 @@ class ConductorMethodTests(testtools.TestCase):
name=new_name)
bkup = self._get_backup(bkup_id)
self.assertEqual(new_name, bkup.name)
# --- Tests for discarding old messages ---
def test_heartbeat_newer_timestamp_accepted(self):
new_p = {'service_status': ServiceStatuses.NEW.description}
build_p = {'service_status': ServiceStatuses.BUILDING.description}
iss_id = self._create_iss()
iss = self._get_iss(iss_id)
now = timeutils.float_utcnow()
future = now + 60
self.cond_mgr.heartbeat(None, self.instance_id, new_p, sent=now)
self.cond_mgr.heartbeat(None, self.instance_id, build_p, sent=future)
iss = self._get_iss(iss_id)
self.assertEqual(ServiceStatuses.BUILDING, iss.status)
def test_heartbeat_older_timestamp_discarded(self):
new_p = {'service_status': ServiceStatuses.NEW.description}
build_p = {'service_status': ServiceStatuses.BUILDING.description}
iss_id = self._create_iss()
iss = self._get_iss(iss_id)
now = timeutils.float_utcnow()
past = now - 60
self.cond_mgr.heartbeat(None, self.instance_id, new_p, sent=past)
self.cond_mgr.heartbeat(None, self.instance_id, build_p, sent=past)
iss = self._get_iss(iss_id)
self.assertEqual(ServiceStatuses.NEW, iss.status)
def test_backup_newer_timestamp_accepted(self):
old_name = "oldname"
new_name = "renamed"
bkup_id = self._create_backup(old_name)
bkup = self._get_backup(bkup_id)
now = timeutils.float_utcnow()
future = now + 60
self.cond_mgr.update_backup(None, self.instance_id, bkup_id,
sent=now, name=old_name)
self.cond_mgr.update_backup(None, self.instance_id, bkup_id,
sent=future, name=new_name)
bkup = self._get_backup(bkup_id)
self.assertEqual(new_name, bkup.name)
def test_backup_older_timestamp_discarded(self):
old_name = "oldname"
new_name = "renamed"
bkup_id = self._create_backup(old_name)
bkup = self._get_backup(bkup_id)
now = timeutils.float_utcnow()
past = now - 60
self.cond_mgr.update_backup(None, self.instance_id, bkup_id,
sent=now, name=old_name)
self.cond_mgr.update_backup(None, self.instance_id, bkup_id,
sent=past, name=new_name)
bkup = self._get_backup(bkup_id)
self.assertEqual(old_name, bkup.name)