From 21cc07bed337ae5f50673ad3e72bdccad199e77f Mon Sep 17 00:00:00 2001 From: xiaodongwang Date: Mon, 4 Aug 2014 16:33:03 -0700 Subject: [PATCH] update host state when updating clusterhost state. Change-Id: I9c208870de9cc6e6dd8310c32455a951b1f09585 --- compass/actions/update_progress.py | 11 +-- compass/api/api.py | 11 +++ compass/db/api/cluster.py | 25 +++--- compass/db/api/host.py | 6 +- compass/db/api/user.py | 13 +++ compass/db/models.py | 101 +++++++++++++++++++----- compass/log_analyzor/adapter_matcher.py | 88 +++++++++++---------- compass/tasks/tasks.py | 10 +++ requirements.txt | 3 +- service/compass-celeryd | 2 +- service/compass-progress-updated | 2 +- 11 files changed, 181 insertions(+), 91 deletions(-) diff --git a/compass/actions/update_progress.py b/compass/actions/update_progress.py index 96b000d4..5246bd54 100644 --- a/compass/actions/update_progress.py +++ b/compass/actions/update_progress.py @@ -88,16 +88,13 @@ def update_progress(cluster_hosts): clusterid = cluster.id adapter = cluster.adapter - os_installer = adapter.os_installer + os_installer = adapter.adapter_os_installer os_installer_name = os_installer.instance_name - package_installer = adapter.package_installer + package_installer = adapter.adapter_package_installer package_installer_name = package_installer.instance_name - distributed_system = cluster.distributed_system - distributed_system_name = distributed_system.name - - host = session.query(models.Host).first() - os_name = host.os_name + distributed_system_name = cluster.distributed_system_name + os_name = cluster.os_name os_names[clusterid] = os_name distributed_systems[clusterid] = distributed_system_name diff --git a/compass/api/api.py b/compass/api/api.py index f70f3ca5..0877b08b 100644 --- a/compass/api/api.py +++ b/compass/api/api.py @@ -238,6 +238,17 @@ def show_user(user_id): ) +@app.route("/current-user", methods=['GET']) +@log_user_action +@login_required +def show_current_user(): + """Get user.""" + data = _get_request_args() + return utils.make_json_response( + 200, user_api.get_current_user(current_user, **data) + ) + + @app.route("/users/", methods=['PUT']) @log_user_action @login_required diff --git a/compass/db/api/cluster.py b/compass/db/api/cluster.py index 17df9f07..c13041b2 100644 --- a/compass/db/api/cluster.py +++ b/compass/db/api/cluster.py @@ -1055,7 +1055,7 @@ def review_cluster(session, reviewer, cluster_id, review={}, **kwargs): deployed_os_config, host.os_id, True ) host_api.validate_host(session, host) - host.config_validated = True + utils.update_db_object(session, host, config_validated=True) package_config = cluster.package_config if package_config: metadata_api.validate_package_config( @@ -1070,8 +1070,8 @@ def review_cluster(session, reviewer, cluster_id, review={}, **kwargs): deployed_package_config, cluster.adapter_id, True ) - clusterhost.config_validated = True - cluster.config_validated = True + utils.update_db_object(session, clusterhost, config_validated=True) + utils.update_db_object(session, cluster, config_validated=True) return { 'cluster': cluster, 'clusterhosts': cluster.clusterhosts @@ -1108,16 +1108,8 @@ def deploy_cluster( ) is_cluster_editable(session, cluster, deployer) is_cluster_validated(session, cluster) - utils.update_db_object( - session, cluster.state, state='INITIALIZED' - ) + utils.update_db_object(session, cluster.state, state='INITIALIZED') for clusterhost in clusterhosts: - if cluster.distributed_system: - is_clusterhost_validated(session, clusterhost) - utils.update_db_object( - session, clusterhost.state, - state='INITIALIZED' - ) host = clusterhost.host if host_api.is_host_editable( session, host, deployer, @@ -1126,8 +1118,11 @@ def deploy_cluster( host_api.is_host_validated( session, host ) + utils.update_db_object(session, host.state, state='INITIALIZED') + if cluster.distributed_system: + is_clusterhost_validated(session, clusterhost) utils.update_db_object( - session, host.state, state='INITIALIZED' + session, clusterhost.state, state='INITIALIZED' ) celery_client.celery.send_task( @@ -1208,7 +1203,7 @@ def update_cluster_host_state( @utils.supported_filters( optional_support_keys=UPDATED_CLUSTERHOST_STATE_FIELDS ) -## @database.run_in_session() +@database.run_in_session() @user_api.check_user_permission_in_session( permission.PERMISSION_UPDATE_CLUSTERHOST_STATE ) @@ -1227,7 +1222,7 @@ def update_clusterhost_state( @utils.supported_filters( optional_support_keys=UPDATED_CLUSTER_STATE_FIELDS ) -## @database.run_in_session() +@database.run_in_session() @user_api.check_user_permission_in_session( permission.PERMISSION_UPDATE_CLUSTER_STATE ) diff --git a/compass/db/api/host.py b/compass/db/api/host.py index 19fa5f4d..21f57581 100644 --- a/compass/db/api/host.py +++ b/compass/db/api/host.py @@ -84,7 +84,7 @@ RESP_STATE_FIELDS = [ 'id', 'state', 'percentage', 'message' ] UPDATED_STATE_FIELDS = [ - 'id', 'state', 'percentage', 'message' + 'state', 'percentage', 'message' ] @@ -588,8 +588,8 @@ def get_host_state(session, getter, host_id, **kwargs): ).state_dict() -@utils.supported_filters(UPDATED_STATE_FIELDS) -## @database.run_in_session() +@utils.supported_filters(optional_support_keys=UPDATED_STATE_FIELDS) +@database.run_in_session() @user_api.check_user_permission_in_session( permission.PERMISSION_UPDATE_HOST_STATE ) diff --git a/compass/db/api/user.py b/compass/db/api/user.py index 391a8e75..418e1319 100644 --- a/compass/db/api/user.py +++ b/compass/db/api/user.py @@ -293,6 +293,19 @@ def get_user( ) +@utils.supported_filters() +@database.run_in_session() +@utils.wrap_to_dict(RESP_FIELDS) +def get_current_user( + session, getter, + exception_when_missing=True, **kwargs +): + """get field dict of a user.""" + return utils.get_db_object( + session, models.User, exception_when_missing, id=getter.id + ) + + @utils.supported_filters( optional_support_keys=SUPPORTED_FIELDS ) diff --git a/compass/db/models.py b/compass/db/models.py index 1414008f..abbb0b9d 100644 --- a/compass/db/models.py +++ b/compass/db/models.py @@ -173,7 +173,7 @@ class FieldMixin(HelperMixin): field_type_data = Column( 'field_type', Enum('basestring', 'int', 'float', 'list', 'bool'), - default='basestring' + ColumnDefault('basestring') ) display_type = Column( Enum( @@ -181,7 +181,7 @@ class FieldMixin(HelperMixin): 'multiselect', 'combobox', 'text', 'multitext', 'password' ), - default='text' + ColumnDefault('text') ) validator_data = Column('validator', Text) js_validator = Column(Text) @@ -270,13 +270,13 @@ class StateMixin(TimestampMixin, HelperMixin): 'UNINITIALIZED', 'INITIALIZED', 'INSTALLING', 'SUCCESSFUL', 'ERROR' ), - default='UNINITIIALIZED' + ColumnDefault('UNINITIALIZED') ) percentage = Column(Float, default=0.0) message = Column(Text, default='') severity = Column( Enum('INFO', 'WARNING', 'ERROR'), - default='INFO' + ColumnDefault('INFO') ) def update(self): @@ -284,13 +284,14 @@ class StateMixin(TimestampMixin, HelperMixin): self.percentage = 0.0 self.severity = 'INFO' self.message = '' - if self.severity == 'ERROR': - self.state = 'ERROR' + if self.state == 'INSTALLING': + if self.severity == 'ERROR': + self.state = 'ERROR' + elif self.percentage >= 1.0: + self.state = 'SUCCESSFUL' + self.percentage = 1.0 if self.state == 'SUCCESSFUL': self.percentage = 1.0 - if self.percentage >= 1.0: - self.state = 'SUCCESSFUL' - self.percentage = 1.0 super(StateMixin, self).update() @@ -392,6 +393,18 @@ class ClusterHostState(BASE, StateMixin): primary_key=True ) + def update(self): + host_state = self.host.state + if self.state == 'INITIALIZED': + if host_state.state in ['UNINITIALIZED']: + host_state.state = 'INITIALIZED' + host_state.update() + elif self.state == 'INSTALLING': + if host_state.state in ['UNINITIALIZED', 'INITIALIZED']: + host_state.state = 'INSTALLING' + host_state.update() + super(ClusterHostState, self).update() + class ClusterHost(BASE, TimestampMixin, HelperMixin): """ClusterHost table.""" @@ -429,6 +442,15 @@ class ClusterHost(BASE, TimestampMixin, HelperMixin): self.state = ClusterHostState() super(ClusterHost, self).__init__(**kwargs) + def update(self): + if self.host.reinstall_os: + if self.state in ['SUCCESSFUL', 'ERROR']: + if self.config_validated: + self.state.state = 'INITIALIZED' + else: + self.state.state = 'UNINITIALIZED' + self.state.update() + @property def name(self): return '%s.%s' % (self.host.name, self.cluster.name) @@ -440,6 +462,10 @@ class ClusterHost(BASE, TimestampMixin, HelperMixin): @patched_package_config.setter def patched_package_config(self, value): self.package_config = util.merge_dict(dict(self.package_config), value) + logging.info( + 'patch clusterhost %s package config: %s', + self.id, value + ) self.config_validated = False @property @@ -451,6 +477,10 @@ class ClusterHost(BASE, TimestampMixin, HelperMixin): package_config = dict(self.package_config) package_config.update(value) self.package_config = package_config + logging.info( + 'put clusterhost %s package config: %s', + self.id, value + ) self.config_validated = False @property @@ -460,7 +490,7 @@ class ClusterHost(BASE, TimestampMixin, HelperMixin): @patched_os_config.setter def patched_os_config(self, value): host = self.host - host.os_config = util.merge_dict(dict(host.os_config), value) + host.patched_os_config = value @property def put_os_config(self): @@ -469,9 +499,7 @@ class ClusterHost(BASE, TimestampMixin, HelperMixin): @put_os_config.setter def put_os_config(self, value): host = self.host - os_config = dict(host.os_config) - os_config.update(value) - host.os_config = os_config + host.put_os_config = value @hybrid_property def distributed_system_name(self): @@ -586,6 +614,26 @@ class HostState(BASE, StateMixin): host = self.host if self.state == 'INSTALLING': host.reinstall_os = False + for clusterhost in self.host.clusterhosts: + if clusterhost.state in [ + 'SUCCESSFUL', 'ERROR' + ]: + clusterhost.state = 'INSTALLING' + clusterhost.state.update() + elif self.state == 'UNINITIALIZED': + for clusterhost in self.host.clusterhosts: + if clusterhost.state in [ + 'INITIALIZED', 'INSTALLING', 'SUCCESSFUL', 'ERROR' + ]: + clusterhost.state = 'UNINITIALIZED' + clusterhost.state.update() + elif self.state == 'INITIALIZED': + for clusterhost in self.host.clusterhosts: + if clusterhost.state in [ + 'INSTALLING', 'SUCCESSFUL', 'ERROR' + ]: + clusterhost.state = 'INITIALIZED' + clusterhost.state.update() super(HostState, self).update() @@ -643,6 +691,7 @@ class Host(BASE, TimestampMixin, HelperMixin): @patched_os_config.setter def patched_os_config(self, value): self.os_config = util.merge_dict(dict(self.os_config), value) + logging.info('patch host os config in %s: %s', self.id, value) self.config_validated = False @property @@ -654,11 +703,11 @@ class Host(BASE, TimestampMixin, HelperMixin): os_config = dict(self.os_config) os_config.update(value) self.os_config = os_config + logging.info('put host os config in %s: %s', self.id, value) self.config_validated = False def __init__(self, id, **kwargs): self.id = id - self.name = str(self.id) self.state = HostState() super(Host, self).__init__(**kwargs) @@ -669,7 +718,12 @@ class Host(BASE, TimestampMixin, HelperMixin): def update(self): if self.reinstall_os: - self.state = HostState() + if self.state in ['SUCCESSFUL', 'ERROR']: + if self.config_validated: + self.state.state = 'INITIALIZED' + else: + self.state.state = 'UNINITIALIZED' + self.state.update() os = self.os if os: self.os_name = os.name @@ -864,7 +918,12 @@ class Cluster(BASE, TimestampMixin, HelperMixin): def update(self): if self.reinstall_distributed_system: - self.state = ClusterState() + if self.state in ['SUCCESSFUL', 'ERROR']: + if self.config_validated: + self.state.state = 'INITIALIZED' + else: + self.state.state = 'UNINITIALIZED' + self.state.update() os = self.os if os: self.os_name = os.name @@ -876,14 +935,10 @@ class Cluster(BASE, TimestampMixin, HelperMixin): self.adapter_name = adapter.name self.distributed_system = adapter.adapter_distributed_system self.distributed_system_name = self.distributed_system.name - self.put_package_config = { - 'roles': [role.name for role in adapter.roles] - } else: self.adapter_name = None self.distributed_system = None self.distributed_system_name = None - self.package_config = {} super(Cluster, self).update() def validate(self): @@ -926,6 +981,7 @@ class Cluster(BASE, TimestampMixin, HelperMixin): @patched_os_config.setter def patched_os_config(self, value): self.os_config = util.merge_dict(dict(self.os_config), value) + logging.info('patch cluster %s os config: %s', self.id, value) self.config_validated = False @property @@ -937,6 +993,7 @@ class Cluster(BASE, TimestampMixin, HelperMixin): os_config = dict(self.os_config) os_config.update(value) self.os_config = os_config + logging.info('put cluster %s os config: %s', self.id, value) self.config_validated = False @property @@ -947,6 +1004,7 @@ class Cluster(BASE, TimestampMixin, HelperMixin): def patched_package_config(self, value): package_config = dict(self.package_config) self.package_config = util.merge_dict(package_config, value) + logging.info('patch cluster %s package config: %s', self.id, value) self.config_validated = False @property @@ -958,6 +1016,7 @@ class Cluster(BASE, TimestampMixin, HelperMixin): package_config = dict(self.package_config) package_config.update(value) self.package_config = package_config + logging.info('put cluster %s package config: %s', self.id, value) self.config_validated = False @hybrid_property @@ -1345,7 +1404,7 @@ class Switch(BASE, HelperMixin, TimestampMixin): state = Column(Enum('initialized', 'unreachable', 'notsupported', 'repolling', 'error', 'under_monitoring', name='switch_state'), - default='initialized') + ColumnDefault('initialized')) filters = Column(JSONEncoded, default=[]) switch_machines = relationship( SwitchMachine, diff --git a/compass/log_analyzor/adapter_matcher.py b/compass/log_analyzor/adapter_matcher.py index 5f3ebdd2..16a7de81 100644 --- a/compass/log_analyzor/adapter_matcher.py +++ b/compass/log_analyzor/adapter_matcher.py @@ -200,7 +200,7 @@ class AdapterMatcher(object): session = database.current_session() clusterhost = session.query( ClusterHost - ).filter_by(id=hostid).first() + ).filter_by(host_id=hostid).first() if not clusterhost: logging.error( 'there is no clusterhost for %s in ClusterHost', @@ -226,7 +226,7 @@ class AdapterMatcher(object): def _update_host_progress(cls, hostid, host_progress, updater): """Updates host progress to db.""" - state = '' + state = 'INSTALLING' with database.session() as session: host = session.query( Host).filter_by(id=hostid).first() @@ -265,13 +265,13 @@ class AdapterMatcher(object): if host.state.severity == 'ERROR': state = 'ERROR' + logging.info('update host state by %s', updater) host_api.update_host_state( updater, hostid, state=state, percentage=host_progress.progress, - message=host_progress.message, - id=hostid + message=host_progress.message ) logging.debug( @@ -281,15 +281,16 @@ class AdapterMatcher(object): @classmethod def _update_clusterhost_progress( cls, + clusterid, hostid, clusterhost_progress, updater ): - clusterhost_state = '' + clusterhost_state = 'INSTALLING' with database.session() as session: clusterhost = session.query( - ClusterHost).filter_by(id=hostid).first() + ClusterHost).filter_by(host_id=hostid).first() if not clusterhost.state: logging.error( @@ -322,8 +323,10 @@ class AdapterMatcher(object): if clusterhost.state.severity == 'ERROR': clusterhost_state = 'ERROR' - cluster_api.update_clusterhost_state( + logging.info('updatge clusterhost state by %s', updater) + cluster_api.update_cluster_host_state( updater, + clusterid, hostid, state=clusterhost_state, percentage=clusterhost_progress.progress, @@ -364,40 +367,44 @@ class AdapterMatcher(object): cluster_messages = {} cluster_severities = set([]) cluster_installing_hosts = 0 + cluster_completed_hosts = 0 cluster_failed_hosts = 0 - hostids = [] clusterhosts = cluster.clusterhosts - hosts = [clusterhost.host for clusterhost in clusterhosts] - for host in hosts: - if host.state: - hostids.append(host.id) - cluster_progress += host.state.percentage + if not cluster.distributed_system: + hosts = [clusterhost.host for clusterhost in clusterhosts] + for host in hosts: + if host.state: + cluster_progress += host.state.percentage + if host.state.state == 'INSTALLING': + cluster_installing_hosts += 1 + elif host.state.state == 'SUCCESSFUL': + cluster_completed_hosts += 1 + elif host.state.state == 'ERROR': + cluster_failed_hosts += 1 if host.state.message: cluster_messages[host.name] = host.state.message - if host.state.severity: cluster_severities.add(host.state.severity) + else: + for clusterhost in clusterhosts: + if clusterhost.state: + cluster_progress += clusterhost.state.percentage + if clusterhost.state.state == 'INSTALLING': + cluster_installing_hosts += 1 + elif clusterhost.state.state == 'SUCCESSFUL': + cluster_completed_hosts += 1 + elif clusterhost.state.state == 'ERROR': + cluster_failed_hosts += 1 + if clusterhost.state.message: + cluster_messages[clusterhost.name] = ( + clusterhost.state.message + ) + if clusterhost.state.severity: + cluster_severities.add(clusterhost.state.severity) - for clusterhost in clusterhosts: - if clusterhost.state: - cluster_progress += clusterhost.state.percentage - if clusterhost.state.state == 'INSTALLING': - cluster_installing_hosts += 1 - elif (clusterhost.host.state.state not in - ['ERROR', 'INITIALIZED'] and - clusterhost.state.state != 'ERORR'): - cluster_installing_hosts += 1 - elif (clusterhost.state.state == 'ERROR' or - clusterhost.host.state.state == 'ERROR'): - cluster_failed_hosts += 1 - - if clusterhost.state.message: - cluster_messages[host.name] = clusterhost.state.message - - if clusterhost.state.severity: - cluster_severities.add(clusterhost.state.severity) - - cluster.state.percentage = cluster_progress / (len(hostids) * 2) + cluster.state.percentage = ( + float(cluster_completed_hosts) / float(cluster.state.total_hosts) + ) cluster.state.message = '\n'.join( [ '%s: %s' % (hostname, message) @@ -410,19 +417,15 @@ class AdapterMatcher(object): break if cluster.state.percentage >= 1.0: - cluster.state.state = 'READY' + cluster.state.state = 'SUCCESSFUL' if cluster.state.severity == 'ERROR': cluster.state.state = 'ERROR' - if cluster.state.state != 'INSTALLING': - cluster.mutable = True - cluster.state.installing_hosts = cluster_installing_hosts cluster.state.total_hosts = len(clusterhosts) cluster.state.failed_hosts = cluster_failed_hosts - cluster.state.completed_hosts = cluster.state.total_hosts - \ - cluster.state.installing_hosts - cluster.state.failed_hosts + cluster.state.completed_hosts = cluster_completed_hosts logging.debug( 'update cluster %s state %s', @@ -433,7 +436,9 @@ class AdapterMatcher(object): host_progresses = {} clusterhost_progresses = {} updater = user_api.get_user_object( - 'admin@abc.com' + 'admin@abc.com', + expire_timestamp=datetime.datetime.now() + + datetime.timedelta(seconds=10000) ) with database.session(): for hostid in hostids: @@ -500,6 +505,7 @@ class AdapterMatcher(object): _, _, clusterhost_progress = clusterhost_progresses[hostid] self._update_host_progress(hostid, host_progress, updater) self._update_clusterhost_progress( + clusterid, hostid, clusterhost_progress, updater diff --git a/compass/tasks/tasks.py b/compass/tasks/tasks.py index b2c6cbbb..d8d10735 100644 --- a/compass/tasks/tasks.py +++ b/compass/tasks/tasks.py @@ -19,6 +19,7 @@ import logging from celery.signals import celeryd_init +from celery.signals import setup_logging from compass.actions import deploy from compass.actions import poll_switch @@ -44,6 +45,14 @@ def global_celery_init(**_): metadata_api.load_metadatas() +@setup_logging.connect() +def tasks_setup_logging(**_): + """Setup logging options from compass setting.""" + flags.init() + flags.OPTIONS.logfile = setting.CELERY_LOGFILE + logsetting.init() + + @celery.task(name='compass.tasks.pollswitch') def pollswitch( poller_email, ip_addr, credentials, @@ -129,6 +138,7 @@ def update_clusters_progress(cluster_hosts): :param cluster_hosts: the cluster and hosts of each cluster to update. :type cluster_hosts: dict of int to list of int """ + logging.info('update_clusters_progress: %s', cluster_hosts) try: update_progress.update_progress(cluster_hosts) except Exception as error: diff --git a/requirements.txt b/requirements.txt index 9c339c2a..cdf3172f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -4,7 +4,7 @@ flask-restful flask-sqlalchemy flask-login celery -cheetah +cheetah==2.4.1 netaddr paramiko==1.7.5 simplejson @@ -14,6 +14,5 @@ redis flask-wtf itsdangerous importlib -MySQL-python lockfile daemon diff --git a/service/compass-celeryd b/service/compass-celeryd index 8ba99be4..0559e2c0 100755 --- a/service/compass-celeryd +++ b/service/compass-celeryd @@ -26,7 +26,7 @@ SUSE=/etc/SuSE-release CELERY=$CeleryPath if [ -f $DEBIAN ]; then - . /lib/lsb/init_functions + . /lib/lsb/init-functions elif [ -f $SUSE -a -r /etc/rc.status ]; then . /etc/rc.status else diff --git a/service/compass-progress-updated b/service/compass-progress-updated index 9a634bf0..e2eab84c 100755 --- a/service/compass-progress-updated +++ b/service/compass-progress-updated @@ -26,7 +26,7 @@ SUSE=/etc/SuSE-release PYTHON=$Python if [ -f $DEBIAN ]; then - . /lib/lsb/init_functions + . /lib/lsb/init-functions elif [ -f $SUSE -a -r /etc/rc.status ]; then . /etc/rc.status else