Do not override RESTART_REQUIRED service status

The RESTART_REQUIRED service status set by guest agent should not
be overridden by the health heartbeat mechanism.

The RESTART_REQUIRED can only be changed when the instance is restarted
or rebooted.

Story: 2008612
Task: 41795
Change-Id: I98baf252452353237bc8fb14357df4e7bcb2867a
This commit is contained in:
Lingxian Kong 2021-02-10 09:53:06 +13:00
parent 6edd3332b6
commit 9c2e0bf3a0
5 changed files with 138 additions and 43 deletions

View File

@ -24,7 +24,7 @@ from trove.common.serializable_notification import SerializableNotification
from trove.conductor.models import LastSeen from trove.conductor.models import LastSeen
from trove.extensions.mysql import models as mysql_models from trove.extensions.mysql import models as mysql_models
from trove.instance import models as inst_models from trove.instance import models as inst_models
from trove.instance.service_status import ServiceStatus from trove.instance import service_status as svc_status
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
CONF = cfg.CONF CONF = cfg.CONF
@ -86,11 +86,20 @@ class Manager(periodic_task.PeriodicTasks):
"payload": str(payload)}) "payload": str(payload)})
status = inst_models.InstanceServiceStatus.find_by( status = inst_models.InstanceServiceStatus.find_by(
instance_id=instance_id) instance_id=instance_id)
if self._message_too_old(instance_id, 'heartbeat', sent): if self._message_too_old(instance_id, 'heartbeat', sent):
return return
if status.get_status() == svc_status.ServiceStatuses.RESTART_REQUIRED:
LOG.debug("Instance %s service status is RESTART_REQUIRED, "
"skip heartbeat", instance_id)
return
if payload.get('service_status') is not None: if payload.get('service_status') is not None:
status.set_status( status.set_status(
ServiceStatus.from_description(payload['service_status'])) svc_status.ServiceStatus.from_description(
payload['service_status'])
)
status.save() status.save()
def update_backup(self, context, instance_id, backup_id, def update_backup(self, context, instance_id, backup_id,

View File

@ -15,6 +15,7 @@
import enum import enum
import hashlib import hashlib
import os import os
from pathlib import Path
from requests.exceptions import ConnectionError from requests.exceptions import ConnectionError
from oslo_log import log as logging from oslo_log import log as logging
@ -245,34 +246,36 @@ class GuestLog(object):
'published': self._published_size}) 'published': self._published_size})
def _update_details(self): def _update_details(self):
# Make sure we can read the file
if not self._file_readable or not os.access(self._file, os.R_OK):
if not os.access(self._file, os.R_OK):
if operating_system.exists(self._file, as_root=True): if operating_system.exists(self._file, as_root=True):
operating_system.chmod( file_path = Path(self._file)
self._file, FileMode.ADD_ALL_R, as_root=True)
self._file_readable = True
if os.path.isfile(self._file): # Make sure guest agent can read the log file.
logstat = os.stat(self._file) if not os.access(self._file, os.R_OK):
self._size = logstat.st_size operating_system.chmod(self._file, FileMode.ADD_ALL_R,
as_root=True)
operating_system.chmod(str(file_path.parent),
FileMode.ADD_GRP_RX_OTH_RX,
as_root=True)
self._size = file_path.stat().st_size
self._update_log_header_digest(self._file) self._update_log_header_digest(self._file)
if self.status != LogStatus.Disabled: if self.status != LogStatus.Disabled:
if self._log_rotated(): if self._log_rotated():
self.status = LogStatus.Rotated self.status = LogStatus.Rotated
# See if we have stuff to publish # See if we have stuff to publish
elif logstat.st_size > self._published_size: elif self._size > self._published_size:
self._set_status(self._published_size, self._set_status(self._published_size,
LogStatus.Partial, LogStatus.Ready) LogStatus.Partial, LogStatus.Ready)
# We've published everything so far # We've published everything so far
elif logstat.st_size == self._published_size: elif self._size == self._published_size:
self._set_status(self._published_size, self._set_status(self._published_size,
LogStatus.Published, LogStatus.Enabled) LogStatus.Published, LogStatus.Enabled)
# We've already handled this case (log rotated) so what gives? # We've already handled this case (log rotated) so what gives?
else: else:
raise Exception(_("Bug in _log_rotated ?")) raise Exception(_("Bug in _log_rotated ?"))
else: else:
LOG.warning(f"File {self._file} does not exist")
self._published_size = 0 self._published_size = 0
self._size = 0 self._size = 0

View File

@ -607,23 +607,30 @@ def load_instance(cls, context, id, needs_server=False,
return cls(context, db_info, server, service_status) return cls(context, db_info, server, service_status)
def load_instance_with_info(cls, context, id, cluster_id=None): def update_service_status(task_status, service_status, ins_id):
db_info = get_db_info(context, id, cluster_id) """Update service status as needed."""
if (task_status == InstanceTasks.NONE and
service_status = InstanceServiceStatus.find_by(instance_id=id) service_status.status != srvstatus.ServiceStatuses.RESTART_REQUIRED and
if (db_info.task_status == InstanceTasks.NONE and
not service_status.is_uptodate()): not service_status.is_uptodate()):
LOG.warning('Guest agent heartbeat for instance %s has expried', id) LOG.warning('Guest agent heartbeat for instance %s has expried',
ins_id)
service_status.status = \ service_status.status = \
srvstatus.ServiceStatuses.FAILED_TIMEOUT_GUESTAGENT srvstatus.ServiceStatuses.FAILED_TIMEOUT_GUESTAGENT
def load_instance_with_info(cls, context, ins_id, cluster_id=None):
db_info = get_db_info(context, ins_id, cluster_id)
service_status = InstanceServiceStatus.find_by(instance_id=ins_id)
update_service_status(db_info.task_status, service_status, ins_id)
load_simple_instance_server_status(context, db_info) load_simple_instance_server_status(context, db_info)
load_simple_instance_addresses(context, db_info) load_simple_instance_addresses(context, db_info)
instance = cls(context, db_info, service_status) instance = cls(context, db_info, service_status)
load_guest_info(instance, context, id) load_guest_info(instance, context, ins_id)
load_server_group_info(instance, context) load_server_group_info(instance, context)
@ -901,6 +908,11 @@ class BaseInstance(SimpleInstance):
del_instance.set_status(srvstatus.ServiceStatuses.DELETED) del_instance.set_status(srvstatus.ServiceStatuses.DELETED)
del_instance.save() del_instance.save()
def set_servicestatus_restart(self):
del_instance = InstanceServiceStatus.find_by(instance_id=self.id)
del_instance.set_status(srvstatus.ServiceStatuses.RESTARTING)
del_instance.save()
def set_instance_fault_deleted(self): def set_instance_fault_deleted(self):
try: try:
del_fault = DBInstanceFault.find_by(instance_id=self.id) del_fault = DBInstanceFault.find_by(instance_id=self.id)
@ -1442,7 +1454,10 @@ class Instance(BuiltInstance):
LOG.info("Rebooting instance %s.", self.id) LOG.info("Rebooting instance %s.", self.id)
if self.db_info.cluster_id is not None and not self.context.is_admin: if self.db_info.cluster_id is not None and not self.context.is_admin:
raise exception.ClusterInstanceOperationNotSupported() raise exception.ClusterInstanceOperationNotSupported()
self.update_db(task_status=InstanceTasks.REBOOTING) self.update_db(task_status=InstanceTasks.REBOOTING)
self.set_servicestatus_restart()
task_api.API(self.context).reboot(self.id) task_api.API(self.context).reboot(self.id)
def restart(self): def restart(self):
@ -1450,13 +1465,10 @@ class Instance(BuiltInstance):
LOG.info("Restarting datastore on instance %s.", self.id) LOG.info("Restarting datastore on instance %s.", self.id)
if self.db_info.cluster_id is not None and not self.context.is_admin: if self.db_info.cluster_id is not None and not self.context.is_admin:
raise exception.ClusterInstanceOperationNotSupported() raise exception.ClusterInstanceOperationNotSupported()
# Set our local status since Nova might not change it quick enough.
# TODO(tim.simpson): Possible bad stuff can happen if this service
# shuts down before it can set status to NONE.
# We need a last updated time to mitigate this;
# after some period of tolerance, we'll assume the
# status is no longer in effect.
self.update_db(task_status=InstanceTasks.REBOOTING) self.update_db(task_status=InstanceTasks.REBOOTING)
self.set_servicestatus_restart()
task_api.API(self.context).restart(self.id) task_api.API(self.context).restart(self.id)
def detach_replica(self): def detach_replica(self):
@ -1829,9 +1841,9 @@ class Instances(object):
db.server_status = "SHUTDOWN" # Fake it... db.server_status = "SHUTDOWN" # Fake it...
db.addresses = [] db.addresses = []
datastore_status = InstanceServiceStatus.find_by( service_status = InstanceServiceStatus.find_by(
instance_id=db.id) instance_id=db.id)
if not datastore_status.status: # This should never happen. if not service_status.status: # This should never happen.
LOG.error("Server status could not be read for " LOG.error("Server status could not be read for "
"instance id(%s).", db.id) "instance id(%s).", db.id)
continue continue
@ -1839,24 +1851,15 @@ class Instances(object):
# Get the real-time service status. # Get the real-time service status.
LOG.debug('Task status for instance %s: %s', db.id, LOG.debug('Task status for instance %s: %s', db.id,
db.task_status) db.task_status)
if db.task_status == InstanceTasks.NONE: update_service_status(db.task_status, service_status, db.id)
last_heartbeat_delta = (
timeutils.utcnow() - datastore_status.updated_at)
agent_expiry_interval = timedelta(
seconds=CONF.agent_heartbeat_expiry)
if last_heartbeat_delta > agent_expiry_interval:
LOG.warning(
'Guest agent heartbeat for instance %s has '
'expried', id)
datastore_status.status = \
srvstatus.ServiceStatuses.FAILED_TIMEOUT_GUESTAGENT
except exception.ModelNotFoundError: except exception.ModelNotFoundError:
LOG.error("Server status could not be read for " LOG.error("Server status could not be read for "
"instance id(%s).", db.id) "instance id(%s).", db.id)
continue continue
ret.append(load_instance(context, db, datastore_status, ret.append(
server=server)) load_instance(context, db, service_status, server=server)
)
return ret return ret

View File

@ -36,6 +36,7 @@ class ServiceStatus(object):
ServiceStatuses.CRASHED._code, ServiceStatuses.CRASHED._code,
ServiceStatuses.BLOCKED._code, ServiceStatuses.BLOCKED._code,
ServiceStatuses.HEALTHY._code, ServiceStatuses.HEALTHY._code,
ServiceStatuses.RESTART_REQUIRED._code,
] ]
return self._code in allowed_statuses return self._code in allowed_statuses

View File

@ -11,11 +11,13 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
from datetime import timedelta
from unittest import mock from unittest import mock
from trove.common import cfg from trove.common import cfg
from trove.common import clients from trove.common import clients
from trove.common import exception from trove.common import exception
from trove.common import timeutils
from trove.datastore import models as ds_models from trove.datastore import models as ds_models
from trove.instance import models as ins_models from trove.instance import models as ins_models
from trove.instance import service from trove.instance import service
@ -174,3 +176,80 @@ class TestInstanceController(trove_testtools.TestCase):
mock.ANY, "upgrade", mock.ANY, "upgrade",
instance_id=instance.id, instance_id=instance.id,
datastore_version_id=new_ds_version.id) datastore_version_id=new_ds_version.id)
@mock.patch('trove.instance.models.load_server_group_info')
@mock.patch('trove.instance.models.load_guest_info')
@mock.patch('trove.instance.models.load_simple_instance_addresses')
@mock.patch('trove.instance.models.load_simple_instance_server_status')
def test_show_with_restart_required(self, load_server_mock,
load_addr_mock, load_guest_mock,
load_server_grp_mock):
# Create an instance in db.
instance = ins_models.DBInstance.create(
name=self.random_name('instance'),
flavor_id=self.random_uuid(),
tenant_id=self.random_uuid(),
volume_size=1,
datastore_version_id=self.ds_version_imageid.id,
task_status=ins_models.InstanceTasks.NONE,
compute_instance_id=self.random_uuid(),
server_status='ACTIVE'
)
ins_models.InstanceServiceStatus.create(
instance_id=instance.id,
status=srvstatus.ServiceStatuses.RESTART_REQUIRED,
)
# workaround to reset updated_at field.
service_status = ins_models.InstanceServiceStatus.find_by(
instance_id=instance.id)
service_status.updated_at = timeutils.utcnow() - timedelta(
seconds=(CONF.agent_heartbeat_expiry + 60))
ins_models.get_db_api().save(service_status)
ret = self.controller.show(mock.MagicMock(), mock.ANY, instance.id)
self.assertEqual(200, ret.status)
ret_instance = ret.data(None)['instance']
self.assertEqual('ACTIVE', ret_instance.get('status'))
self.assertEqual('RESTART_REQUIRED',
ret_instance.get('operating_status'))
@mock.patch('trove.instance.models.load_server_group_info')
@mock.patch('trove.instance.models.load_guest_info')
@mock.patch('trove.instance.models.load_simple_instance_addresses')
@mock.patch('trove.instance.models.load_simple_instance_server_status')
def test_show_without_restart_required(self, load_server_mock,
load_addr_mock, load_guest_mock,
load_server_grp_mock):
# Create an instance in db.
instance = ins_models.DBInstance.create(
name=self.random_name('instance'),
flavor_id=self.random_uuid(),
tenant_id=self.random_uuid(),
volume_size=1,
datastore_version_id=self.ds_version_imageid.id,
task_status=ins_models.InstanceTasks.NONE,
compute_instance_id=self.random_uuid(),
server_status='ACTIVE'
)
ins_models.InstanceServiceStatus.create(
instance_id=instance.id,
status=srvstatus.ServiceStatuses.HEALTHY,
)
# workaround to reset updated_at field.
service_status = ins_models.InstanceServiceStatus.find_by(
instance_id=instance.id)
service_status.updated_at = timeutils.utcnow() - timedelta(
seconds=(CONF.agent_heartbeat_expiry + 60))
ins_models.get_db_api().save(service_status)
ret = self.controller.show(mock.MagicMock(), mock.ANY, instance.id)
self.assertEqual(200, ret.status)
ret_instance = ret.data(None)['instance']
self.assertEqual('ACTIVE', ret_instance.get('status'))
self.assertEqual('ERROR', ret_instance.get('operating_status'))