diff --git a/bilean/common/config.py b/bilean/common/config.py index 8b98d5b..5cca774 100644 --- a/bilean/common/config.py +++ b/bilean/common/config.py @@ -42,9 +42,6 @@ service_opts = [ cfg.IntOpt('num_engine_workers', default=processutils.get_worker_count(), help=_('Number of heat-engine processes to fork and run.')), - cfg.StrOpt('environment_dir', - default='/etc/bilean/environments', - help=_('The directory to search for environment files.')), ] engine_opts = [ diff --git a/bilean/common/consts.py b/bilean/common/consts.py index d15d97a..e7a4e70 100644 --- a/bilean/common/consts.py +++ b/bilean/common/consts.py @@ -21,12 +21,30 @@ RPC_ATTRs = ( ENGINE_TOPIC, SCHEDULER_TOPIC, NOTIFICATION_TOPICS, + ENGINE_DISPATCHER_TOPIC, RPC_API_VERSION, ) = ( 'bilean-engine', 'bilean-scheduler', 'billing_notifications', - '1.1', + 'bilean_engine_dispatcher', + '1.0', +) + +ACTION_NAMES = ( + USER_CREATE_RESOURCE, USER_UPDATE_RESOURCE, USER_DELETE_RESOURCE, + USER_SETTLE_ACCOUNT, +) = ( + 'USER_CREATE_RESOURCE', 'USER_UPDATE_RESOURCE', 'USER_DELETE_RESOURCE', + 'USER_SETTLE_ACCOUNT', +) + +ACTION_STATUSES = ( + ACTION_INIT, ACTION_WAITING, ACTION_READY, ACTION_RUNNING, + ACTION_SUCCEEDED, ACTION_FAILED, ACTION_CANCELLED +) = ( + 'INIT', 'WAITING', 'READY', 'RUNNING', + 'SUCCEEDED', 'FAILED', 'CANCELLED', ) RPC_PARAMS = ( diff --git a/bilean/common/exception.py b/bilean/common/exception.py index 2495ccd..5fc76a9 100644 --- a/bilean/common/exception.py +++ b/bilean/common/exception.py @@ -88,6 +88,10 @@ class MultipleChoices(BileanException): "Please be more specific.") +class InvalidInput(BileanException): + msg_fmt = _("Invalid value '%(value)s' specified for '%(name)s'") + + class InvalidParameter(BileanException): msg_fmt = _("Invalid value '%(value)s' specified for '%(name)s'") @@ -108,12 +112,8 @@ class RuleNotSpecified(BileanException): msg_fmt = _("Rule not specified.") -class RuleOperationFailed(BileanException): - msg_fmt = _("%(message)s") - - -class RuleOperationTimeout(BileanException): - msg_fmt = _("%(message)s") +class ActionNotFound(BileanException): + msg_fmt = _("The action (%(action)s) could not be found.") class PolicyNotFound(BileanException): diff --git a/bilean/common/utils.py b/bilean/common/utils.py index f172f72..e7f8929 100644 --- a/bilean/common/utils.py +++ b/bilean/common/utils.py @@ -14,6 +14,7 @@ Utilities module. ''' +import datetime import random import string @@ -151,7 +152,7 @@ def random_name(length=8): def format_time(value): """Cut microsecond and format to isoformat string.""" - if value: + if isinstance(value, datetime.datetime): value = value.replace(microsecond=0) value = value.isoformat() return value diff --git a/bilean/db/api.py b/bilean/db/api.py index c6a0f86..f570279 100644 --- a/bilean/db/api.py +++ b/bilean/db/api.py @@ -120,8 +120,8 @@ def resource_update(context, resource_id, values): return IMPL.resource_update(context, resource_id, values) -def resource_delete(context, resource_id): - IMPL.resource_delete(context, resource_id) +def resource_delete(context, resource_id, soft_delete=True): + IMPL.resource_delete(context, resource_id, soft_delete=soft_delete) # events @@ -187,9 +187,122 @@ def policy_delete(context, policy_id): # locks -def user_lock_acquire(user_id, engine_id): - return IMPL.user_lock_acquire(user_id, engine_id) +def user_lock_acquire(user_id, action_id): + return IMPL.user_lock_acquire(user_id, action_id) -def user_lock_release(user_id, engine_id=None): - return IMPL.user_lock_release(user_id, engine_id=engine_id) +def user_lock_release(user_id, action_id): + return IMPL.user_lock_release(user_id, action_id) + + +def user_lock_steal(user_id, action_id): + return IMPL.user_lock_steal(user_id, action_id) + + +# actions +def action_create(context, values): + return IMPL.action_create(context, values) + + +def action_update(context, action_id, values): + return IMPL.action_update(context, action_id, values) + + +def action_get(context, action_id, project_safe=True, refresh=False): + return IMPL.action_get(context, action_id, project_safe=project_safe, + refresh=refresh) + + +def action_get_all_by_owner(context, owner): + return IMPL.action_get_all_by_owner(context, owner) + + +def action_get_all(context, filters=None, limit=None, marker=None, sort=None, + project_safe=True): + return IMPL.action_get_all(context, filters=filters, sort=sort, + limit=limit, marker=marker, + project_safe=project_safe) + + +def action_check_status(context, action_id, timestamp): + return IMPL.action_check_status(context, action_id, timestamp) + + +def dependency_add(context, depended, dependent): + return IMPL.dependency_add(context, depended, dependent) + + +def dependency_get_depended(context, action_id): + return IMPL.dependency_get_depended(context, action_id) + + +def dependency_get_dependents(context, action_id): + return IMPL.dependency_get_dependents(context, action_id) + + +def action_mark_succeeded(context, action_id, timestamp): + return IMPL.action_mark_succeeded(context, action_id, timestamp) + + +def action_mark_failed(context, action_id, timestamp, reason=None): + return IMPL.action_mark_failed(context, action_id, timestamp, reason) + + +def action_mark_cancelled(context, action_id, timestamp): + return IMPL.action_mark_cancelled(context, action_id, timestamp) + + +def action_acquire(context, action_id, owner, timestamp): + return IMPL.action_acquire(context, action_id, owner, timestamp) + + +def action_acquire_first_ready(context, owner, timestamp): + return IMPL.action_acquire_first_ready(context, owner, timestamp) + + +def action_abandon(context, action_id): + return IMPL.action_abandon(context, action_id) + + +def action_lock_check(context, action_id, owner=None): + '''Check whether an action has been locked(by a owner).''' + return IMPL.action_lock_check(context, action_id, owner) + + +def action_signal(context, action_id, value): + '''Send signal to an action via DB.''' + return IMPL.action_signal(context, action_id, value) + + +def action_signal_query(context, action_id): + '''Query signal status for the sepcified action.''' + return IMPL.action_signal_query(context, action_id) + + +def action_delete(context, action_id, force=False): + return IMPL.action_delete(context, action_id, force) + + +# services +def service_create(context, host, binary, topic=None): + return IMPL.service_create(context, host, binary, topic=topic) + + +def service_update(context, service_id, values=None): + return IMPL.service_update(context, service_id, values=values) + + +def service_delete(context, service_id): + return IMPL.service_delete(context, service_id) + + +def service_get(context, service_id): + return IMPL.service_get(context, service_id) + + +def service_get_by_host_and_binary(context, host, binary): + return IMPL.service_get_by_host_and_binary(context, host, binary) + + +def service_get_all(context): + return IMPL.service_get_all(context) diff --git a/bilean/db/sqlalchemy/api.py b/bilean/db/sqlalchemy/api.py index 91c8e2b..9bdf882 100644 --- a/bilean/db/sqlalchemy/api.py +++ b/bilean/db/sqlalchemy/api.py @@ -19,12 +19,13 @@ from oslo_config import cfg from oslo_db.sqlalchemy import session as db_session from oslo_db.sqlalchemy import utils from oslo_log import log as logging +from oslo_utils import timeutils from sqlalchemy.orm.session import Session from bilean.common import consts from bilean.common import exception -from bilean.common.i18n import _LE +from bilean.common.i18n import _ from bilean.db.sqlalchemy import filters as db_filters from bilean.db.sqlalchemy import migration from bilean.db.sqlalchemy import models @@ -224,7 +225,7 @@ def rule_get_all(context, show_deleted=False, limit=None, return _paginate_query(context, query, models.Rule, limit=limit, marker=marker, sort_keys=keys, sort_dir=sort_dir, - default_sort_keys=['created_at']).all() + default_sort_keys=['id']).all() def rule_create(context, values): @@ -296,7 +297,7 @@ def resource_get_all(context, user_id=None, show_deleted=False, return _paginate_query(context, query, models.Resource, limit=limit, marker=marker, sort_keys=keys, sort_dir=sort_dir, - default_sort_keys=['created_at']).all() + default_sort_keys=['id']).all() def resource_create(context, values): @@ -307,7 +308,11 @@ def resource_create(context, values): def resource_update(context, resource_id, values): - resource = resource_get(context, resource_id) + project_safe = True + if context.is_admin: + project_safe = False + resource = resource_get(context, resource_id, show_deleted=True, + project_safe=project_safe) if resource is None: raise exception.ResourceNotFound(resource=resource_id) @@ -377,7 +382,7 @@ def event_get_all(context, user_id=None, limit=None, marker=None, return _paginate_query(context, query, models.Event, limit=limit, marker=marker, sort_keys=keys, sort_dir=sort_dir, - default_sort_keys=['timestamp']).all() + default_sort_keys=['id']).all() def event_create(context, values): @@ -478,42 +483,394 @@ def policy_delete(context, policy_id): # locks -def user_lock_acquire(user_id, engine_id): - '''Acquire lock on a user. +def user_lock_acquire(user_id, action_id): + session = get_session() + session.begin() - :param user_id: ID of the user. - :param engine_id: ID of the engine which wants to lock the user. - :return: A user lock if success else False. - ''' + lock = session.query(models.UserLock).get(user_id) + if lock is None: + lock = models.UserLock(user_id=user_id, action_id=action_id) + session.add(lock) + + session.commit() + return lock.action_id + + +def user_lock_release(user_id, action_id): + session = get_session() + session.begin() + + success = False + lock = session.query(models.UserLock).get(user_id) + if lock is not None and lock.action_id == action_id: + session.delete(lock) + success = True + + session.commit() + return success + + +def user_lock_steal(user_id, action_id): session = get_session() session.begin() lock = session.query(models.UserLock).get(user_id) if lock is not None: - return False + lock.action_id = action_id + lock.save(session) else: - try: - lock = models.UserLock(user_id=user_id, engine_id=engine_id) - session.add(lock) - except Exception as ex: - LOG.error(_LE('Error: %s'), six.text_type(ex)) - return False - + lock = models.UserLock(user_id=user_id, action_id=action_id) + session.add(lock) session.commit() - return lock + return lock.action_id -def user_lock_release(user_id, engine_id=None): - '''Release lock on a user. +# actions +def action_create(context, values): + action = models.Action() + action.update(values) + action.save(_session(context)) + return action - :param user_id: ID of the user. - :return: True indicates successful release, False indicates failure. - ''' + +def action_update(context, action_id, values): session = get_session() - session.begin() - lock = session.query(models.UserLock).get(user_id) - if lock is None: + action = session.query(models.Action).get(action_id) + if not action: + raise exception.ActionNotFound(action=action_id) + + action.update(values) + action.save(session) + + +def action_get(context, action_id, project_safe=True, refresh=False): + session = _session(context) + action = session.query(models.Action).get(action_id) + if action is None: + return None + + if not context.is_admin and project_safe: + if action.project != context.project: + return None + + session.refresh(action) + return action + + +def action_get_all_by_owner(context, owner_id): + query = model_query(context, models.Action).\ + filter_by(owner=owner_id) + return query.all() + + +def action_get_all(context, filters=None, limit=None, marker=None, + sort_keys=None, sort_dir=None): + + query = model_query(context, models.Action) + + if filters: + query = db_filters.exact_filter(query, models.Action, filters) + + sort_key_map = { + consts.ACTION_CREATED_AT: models.Action.created_at.key, + consts.ACTION_UPDATED_AT: models.Action.updated_at.key, + consts.ACTION_NAME: models.Action.name.key, + consts.ACTION_STATUS: models.Action.status.key, + } + keys = _get_sort_keys(sort_keys, sort_key_map) + + query = db_filters.exact_filter(query, models.Action, filters) + return _paginate_query(context, query, models.Action, + limit=limit, marker=marker, + sort_keys=keys, sort_dir=sort_dir, + default_sort_keys=['id']).all() + + +def action_check_status(context, action_id, timestamp): + session = _session(context) + q = session.query(models.ActionDependency) + count = q.filter_by(dependent=action_id).count() + if count > 0: + return consts.ACTION_WAITING + + action = session.query(models.Action).get(action_id) + if action.status == consts.ACTION_WAITING: + session.begin() + action.status = consts.ACTION_READY + action.status_reason = _('All depended actions completed.') + action.end_time = timestamp + action.save(session) session.commit() - return False - session.delete(lock) + + return action.status + + +def dependency_get_depended(context, action_id): + session = _session(context) + q = session.query(models.ActionDependency).filter_by(dependent=action_id) + return [d.depended for d in q.all()] + + +def dependency_get_dependents(context, action_id): + session = _session(context) + q = session.query(models.ActionDependency).filter_by(depended=action_id) + return [d.dependent for d in q.all()] + + +def dependency_add(context, depended, dependent): + if isinstance(depended, list) and isinstance(dependent, list): + raise exception.NotSupport( + _('Multiple dependencies between lists not support')) + + session = _session(context) + + if isinstance(depended, list): + session.begin() + for d in depended: + r = models.ActionDependency(depended=d, dependent=dependent) + session.add(r) + + query = session.query(models.Action).filter_by(id=dependent) + query.update({'status': consts.ACTION_WAITING, + 'status_reason': _('Waiting for depended actions.')}, + synchronize_session=False) + session.commit() + return + + # Only dependent can be a list now, convert it to a list if it + # is not a list + if not isinstance(dependent, list): # e.g. B,C,D depend on A + dependents = [dependent] + else: + dependents = dependent + + session.begin() + for d in dependents: + r = models.ActionDependency(depended=depended, dependent=d) + session.add(r) + + q = session.query(models.Action).filter(models.Action.id.in_(dependents)) + q.update({'status': consts.ACTION_WAITING, + 'status_reason': _('Waiting for depended actions.')}, + synchronize_session=False) session.commit() - return True + + +def action_mark_succeeded(context, action_id, timestamp): + session = _session(context) + session.begin() + + query = session.query(models.Action).filter_by(id=action_id) + values = { + 'owner': None, + 'status': consts.ACTION_SUCCEEDED, + 'status_reason': _('Action completed successfully.'), + 'end_time': timestamp, + } + query.update(values, synchronize_session=False) + + subquery = session.query(models.ActionDependency).filter_by( + depended=action_id) + subquery.delete(synchronize_session=False) + session.commit() + + +def _mark_failed(session, action_id, timestamp, reason=None): + # mark myself as failed + query = session.query(models.Action).filter_by(id=action_id) + values = { + 'owner': None, + 'status': consts.ACTION_FAILED, + 'status_reason': (six.text_type(reason) if reason else + _('Action execution failed')), + 'end_time': timestamp, + } + query.update(values, synchronize_session=False) + + query = session.query(models.ActionDependency) + query = query.filter_by(depended=action_id) + dependents = [d.dependent for d in query.all()] + query.delete(synchronize_session=False) + + for d in dependents: + _mark_failed(session, d, timestamp) + + +def action_mark_failed(context, action_id, timestamp, reason=None): + session = _session(context) + session.begin() + _mark_failed(session, action_id, timestamp, reason) + session.commit() + + +def _mark_cancelled(session, action_id, timestamp, reason=None): + query = session.query(models.Action).filter_by(id=action_id) + values = { + 'owner': None, + 'status': consts.ACTION_CANCELLED, + 'status_reason': (six.text_type(reason) if reason else + _('Action execution failed')), + 'end_time': timestamp, + } + query.update(values, synchronize_session=False) + + query = session.query(models.ActionDependency) + query = query.filter_by(depended=action_id) + dependents = [d.dependent for d in query.all()] + query.delete(synchronize_session=False) + + for d in dependents: + _mark_cancelled(session, d, timestamp) + + +def action_mark_cancelled(context, action_id, timestamp, reason=None): + session = _session(context) + session.begin() + _mark_cancelled(session, action_id, timestamp, reason) + session.commit() + + +def action_acquire(context, action_id, owner, timestamp): + session = _session(context) + with session.begin(): + action = session.query(models.Action).get(action_id) + if not action: + return None + + if action.owner and action.owner != owner: + return None + + if action.status != consts.ACTION_READY: + msg = _('The action is not in an executable status: ' + '%s') % action.status + LOG.warning(msg) + return None + action.owner = owner + action.start_time = timestamp + action.status = consts.ACTION_RUNNING + action.status_reason = _('The action is being processed.') + + return action + + +def action_acquire_first_ready(context, owner, timestamp): + session = _session(context) + + with session.begin(): + action = session.query(models.Action).\ + filter_by(status=consts.ACTION_READY).\ + filter_by(owner=None).first() + + if action: + action.owner = owner + action.start_time = timestamp + action.status = consts.ACTION_RUNNING + action.status_reason = _('The action is being processed.') + + return action + + +def action_abandon(context, action_id): + '''Abandon an action for other workers to execute again. + + This API is always called with the action locked by the current + worker. There is no chance the action is gone or stolen by others. + ''' + + query = model_query(context, models.Action) + action = query.get(action_id) + + action.owner = None + action.start_time = None + action.status = consts.ACTION_READY + action.status_reason = _('The action was abandoned.') + action.save(query.session) + return action + + +def action_lock_check(context, action_id, owner=None): + action = model_query(context, models.Action).get(action_id) + if not action: + raise exception.ActionNotFound(action=action_id) + + if owner: + return owner if owner == action.owner else action.owner + else: + return action.owner if action.owner else None + + +def action_signal(context, action_id, value): + query = model_query(context, models.Action) + action = query.get(action_id) + if not action: + return + + action.control = value + action.save(query.session) + + +def action_signal_query(context, action_id): + action = model_query(context, models.Action).get(action_id) + if not action: + return None + + return action.control + + +def action_delete(context, action_id, force=False): + session = _session(context) + action = session.query(models.Action).get(action_id) + if not action: + return + if ((action.status == 'WAITING') or (action.status == 'RUNNING') or + (action.status == 'SUSPENDED')): + + raise exception.ResourceBusyError(resource_type='action', + resource_id=action_id) + session.begin() + session.delete(action) + session.commit() + session.flush() + + +# services +def service_create(context, host, binary, topic=None): + time_now = timeutils.utcnow() + svc = models.Service(host=host, binary=binary, + topic=topic, created_at=time_now, + updated_at=time_now) + svc.save(_session(context)) + return svc + + +def service_update(context, service_id, values=None): + + service = service_get(context, service_id) + if not service: + return + + if values is None: + values = {} + + values.update({'updated_at': timeutils.utcnow()}) + service.update(values) + service.save(_session(context)) + return service + + +def service_delete(context, service_id): + session = _session(context) + session.query(models.Service).filter_by( + id=service_id).delete(synchronize_session='fetch') + + +def service_get(context, service_id): + return model_query(context, models.Service).get(service_id) + + +def service_get_by_host_and_binary(context, host, binary): + query = model_query(context, models.Service) + return query.filter_by(host=host).filter_by(binary=binary).first() + + +def service_get_all(context): + return model_query(context, models.Service).all() diff --git a/bilean/db/sqlalchemy/migrate_repo/versions/001_bilean_init.py b/bilean/db/sqlalchemy/migrate_repo/versions/001_bilean_init.py index 4e60e90..1b26629 100644 --- a/bilean/db/sqlalchemy/migrate_repo/versions/001_bilean_init.py +++ b/bilean/db/sqlalchemy/migrate_repo/versions/001_bilean_init.py @@ -32,8 +32,8 @@ def upgrade(migrate_engine): sqlalchemy.Column('rate', sqlalchemy.Float), sqlalchemy.Column('credit', sqlalchemy.Integer), sqlalchemy.Column('last_bill', sqlalchemy.DateTime), - sqlalchemy.Column('status', sqlalchemy.String(10)), - sqlalchemy.Column('status_reason', sqlalchemy.String(255)), + sqlalchemy.Column('status', sqlalchemy.String(255)), + sqlalchemy.Column('status_reason', sqlalchemy.Text), sqlalchemy.Column('created_at', sqlalchemy.DateTime), sqlalchemy.Column('updated_at', sqlalchemy.DateTime), sqlalchemy.Column('deleted_at', sqlalchemy.DateTime), @@ -109,11 +109,66 @@ def upgrade(migrate_engine): mysql_charset='utf8' ) + action = sqlalchemy.Table( + 'action', meta, + sqlalchemy.Column('id', sqlalchemy.String(36), + primary_key=True, nullable=False), + sqlalchemy.Column('name', sqlalchemy.String(63)), + sqlalchemy.Column('context', types.Dict), + sqlalchemy.Column('target', sqlalchemy.String(36)), + sqlalchemy.Column('action', sqlalchemy.String(255)), + sqlalchemy.Column('cause', sqlalchemy.String(255)), + sqlalchemy.Column('owner', sqlalchemy.String(36)), + sqlalchemy.Column('start_time', sqlalchemy.Float(precision='24,8')), + sqlalchemy.Column('end_time', sqlalchemy.Float(precision='24,8')), + sqlalchemy.Column('timeout', sqlalchemy.Integer), + sqlalchemy.Column('inputs', types.Dict), + sqlalchemy.Column('outputs', types.Dict), + sqlalchemy.Column('data', types.Dict), + sqlalchemy.Column('status', sqlalchemy.String(255)), + sqlalchemy.Column('status_reason', sqlalchemy.Text), + sqlalchemy.Column('created_at', sqlalchemy.DateTime), + sqlalchemy.Column('updated_at', sqlalchemy.DateTime), + mysql_engine='InnoDB', + mysql_charset='utf8' + ) + + dependency = sqlalchemy.Table( + 'dependency', meta, + sqlalchemy.Column('id', sqlalchemy.String(36), + primary_key=True, nullable=False), + sqlalchemy.Column('depended', + sqlalchemy.String(36), + sqlalchemy.ForeignKey('action.id'), + nullable=False), + sqlalchemy.Column('dependent', + sqlalchemy.String(36), + sqlalchemy.ForeignKey('action.id'), + nullable=False), + mysql_engine='InnoDB', + mysql_charset='utf8' + ) + user_lock = sqlalchemy.Table( 'user_lock', meta, sqlalchemy.Column('user_id', sqlalchemy.String(36), primary_key=True, nullable=False), - sqlalchemy.Column('engine_id', sqlalchemy.String(36)), + sqlalchemy.Column('action_id', sqlalchemy.String(36)), + mysql_engine='InnoDB', + mysql_charset='utf8' + ) + + service = sqlalchemy.Table( + 'service', meta, + sqlalchemy.Column('id', sqlalchemy.String(36), + primary_key=True, nullable=False), + sqlalchemy.Column('host', sqlalchemy.String(255)), + sqlalchemy.Column('binary', sqlalchemy.String(255)), + sqlalchemy.Column('topic', sqlalchemy.String(255)), + sqlalchemy.Column('disabled', sqlalchemy.Boolean), + sqlalchemy.Column('disabled_reason', sqlalchemy.String(255)), + sqlalchemy.Column('created_at', sqlalchemy.DateTime), + sqlalchemy.Column('updated_at', sqlalchemy.DateTime), mysql_engine='InnoDB', mysql_charset='utf8' ) @@ -124,7 +179,10 @@ def upgrade(migrate_engine): rule, resource, event, + action, + dependency, user_lock, + service, ) for index, table in enumerate(tables): diff --git a/bilean/db/sqlalchemy/models.py b/bilean/db/sqlalchemy/models.py index 7b1c74f..6ae853a 100644 --- a/bilean/db/sqlalchemy/models.py +++ b/bilean/db/sqlalchemy/models.py @@ -87,17 +87,8 @@ class SoftDelete(object): class StateAware(object): - - status = sqlalchemy.Column('status', sqlalchemy.String(10)) - _status_reason = sqlalchemy.Column('status_reason', sqlalchemy.String(255)) - - @property - def status_reason(self): - return self._status_reason - - @status_reason.setter - def status_reason(self, reason): - self._status_reason = reason and reason[:255] or '' + status = sqlalchemy.Column('status', sqlalchemy.String(255)) + status_reason = sqlalchemy.Column('status_reason', sqlalchemy.Text) class User(BASE, BileanBase, SoftDelete, StateAware, models.TimestampMixin): @@ -146,8 +137,7 @@ class Resource(BASE, BileanBase, SoftDelete, models.TimestampMixin): """Represents a meta resource with rate""" __tablename__ = 'resource' - id = sqlalchemy.Column(sqlalchemy.String(36), primary_key=True, - nullable=False) + id = sqlalchemy.Column(sqlalchemy.String(36), primary_key=True) user_id = sqlalchemy.Column( sqlalchemy.String(36), sqlalchemy.ForeignKey('user.id'), @@ -162,13 +152,47 @@ class Resource(BASE, BileanBase, SoftDelete, models.TimestampMixin): rate = sqlalchemy.Column(sqlalchemy.Float, nullable=False) +class Action(BASE, BileanBase, StateAware, models.TimestampMixin): + """Action objects.""" + + __tablename__ = 'action' + id = sqlalchemy.Column(sqlalchemy.String(36), primary_key=True, + default=lambda: UUID4()) + name = sqlalchemy.Column(sqlalchemy.String(63)) + context = sqlalchemy.Column(types.Dict) + target = sqlalchemy.Column(sqlalchemy.String(36)) + action = sqlalchemy.Column(sqlalchemy.String(255)) + cause = sqlalchemy.Column(sqlalchemy.String(255)) + owner = sqlalchemy.Column(sqlalchemy.String(36)) + start_time = sqlalchemy.Column(sqlalchemy.Float(precision='24,8')) + end_time = sqlalchemy.Column(sqlalchemy.Float(precision='24,8')) + timeout = sqlalchemy.Column(sqlalchemy.Integer) + inputs = sqlalchemy.Column(types.Dict) + outputs = sqlalchemy.Column(types.Dict) + data = sqlalchemy.Column(types.Dict) + + +class ActionDependency(BASE, BileanBase): + """Action dependencies.""" + __tablename__ = 'dependency' + + id = sqlalchemy.Column(sqlalchemy.String(36), primary_key=True, + default=lambda: UUID4()) + depended = sqlalchemy.Column(sqlalchemy.String(36), + sqlalchemy.ForeignKey('action.id'), + nullable=False) + dependent = sqlalchemy.Column(sqlalchemy.String(36), + sqlalchemy.ForeignKey('action.id'), + nullable=False) + + class Event(BASE, BileanBase, SoftDelete): """Represents an event generated by the bilean engine.""" __tablename__ = 'event' id = sqlalchemy.Column(sqlalchemy.String(36), primary_key=True, - default=lambda: UUID4(), unique=True) + default=lambda: UUID4()) user_id = sqlalchemy.Column(sqlalchemy.String(36), sqlalchemy.ForeignKey('user.id'), nullable=False) @@ -184,11 +208,10 @@ class Job(BASE, BileanBase): __tablename__ = 'job' - id = sqlalchemy.Column(sqlalchemy.String(50), primary_key=True, - unique=True) + id = sqlalchemy.Column(sqlalchemy.String(50), primary_key=True) scheduler_id = sqlalchemy.Column(sqlalchemy.String(36)) job_type = sqlalchemy.Column(sqlalchemy.String(10)) - parameters = sqlalchemy.Column(types.Dict()) + parameters = sqlalchemy.Column(types.Dict) class UserLock(BASE, BileanBase): @@ -196,6 +219,19 @@ class UserLock(BASE, BileanBase): __tablename__ = 'user_lock' - user_id = sqlalchemy.Column(sqlalchemy.String(36), primary_key=True, - nullable=False) - engine_id = sqlalchemy.Column(sqlalchemy.String(36)) + user_id = sqlalchemy.Column(sqlalchemy.String(36), primary_key=True) + action_id = sqlalchemy.Column(sqlalchemy.String(36)) + + +class Service(BASE, BileanBase, models.TimestampMixin): + """Service registry.""" + + __tablename__ = 'service' + + id = sqlalchemy.Column(sqlalchemy.String(36), primary_key=True, + default=lambda: UUID4()) + host = sqlalchemy.Column(sqlalchemy.String(255)) + binary = sqlalchemy.Column(sqlalchemy.String(255)) + topic = sqlalchemy.Column(sqlalchemy.String(255)) + disabled = sqlalchemy.Column(sqlalchemy.Boolean, default=False) + disabled_reason = sqlalchemy.Column(sqlalchemy.String(255)) diff --git a/bilean/engine/actions/__init__.py b/bilean/engine/actions/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/bilean/engine/actions/base.py b/bilean/engine/actions/base.py new file mode 100644 index 0000000..61e0857 --- /dev/null +++ b/bilean/engine/actions/base.py @@ -0,0 +1,398 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import six +import time + +from oslo_config import cfg +from oslo_log import log as logging +from oslo_utils import timeutils + +from bilean.common import context as req_context +from bilean.common import exception +from bilean.common.i18n import _ +from bilean.common.i18n import _LE +from bilean.db import api as db_api + +wallclock = time.time +LOG = logging.getLogger(__name__) + +# Action causes +CAUSES = ( + CAUSE_RPC, CAUSE_DERIVED, +) = ( + 'RPC Request', + 'Derived Action', +) + + +class Action(object): + '''An action can be performed on a user, rule or policy.''' + + RETURNS = ( + RES_OK, RES_ERROR, RES_RETRY, RES_CANCEL, RES_TIMEOUT, + ) = ( + 'OK', 'ERROR', 'RETRY', 'CANCEL', 'TIMEOUT', + ) + + # Action status definitions: + # INIT: Not ready to be executed because fields are being modified, + # or dependency with other actions are being analyzed. + # READY: Initialized and ready to be executed by a worker. + # RUNNING: Being executed by a worker thread. + # SUCCEEDED: Completed with success. + # FAILED: Completed with failure. + # CANCELLED: Action cancelled because worker thread was cancelled. + STATUSES = ( + INIT, WAITING, READY, RUNNING, SUSPENDED, + SUCCEEDED, FAILED, CANCELLED + ) = ( + 'INIT', 'WAITING', 'READY', 'RUNNING', 'SUSPENDED', + 'SUCCEEDED', 'FAILED', 'CANCELLED', + ) + + # Signal commands + COMMANDS = ( + SIG_CANCEL, SIG_SUSPEND, SIG_RESUME, + ) = ( + 'CANCEL', 'SUSPEND', 'RESUME', + ) + + def __new__(cls, target, action, context, **kwargs): + if (cls != Action): + return super(Action, cls).__new__(cls) + + target_type = action.split('_')[0] + if target_type == 'USER': + from bilean.engine.actions import user_action + ActionClass = user_action.UserAction + # elif target_type == 'RULE': + # from bilean.engine.actions import rule_action + # ActionClass = rule_action.RuleAction + # elif target_type == 'POLICY': + # from bilean.engine.actions import policy_action + # ActionClass = policy_action.PolicyAction + + return super(Action, cls).__new__(ActionClass) + + def __init__(self, target, action, context, **kwargs): + # context will be persisted into database so that any worker thread + # can pick the action up and execute it on behalf of the initiator + + self.id = kwargs.get('id', None) + self.name = kwargs.get('name', '') + + self.context = context + + self.action = action + self.target = target + + # Why this action is fired, it can be a UUID of another action + self.cause = kwargs.get('cause', '') + + # Owner can be an UUID format ID for the worker that is currently + # working on the action. It also serves as a lock. + self.owner = kwargs.get('owner', None) + + self.start_time = kwargs.get('start_time', None) + self.end_time = kwargs.get('end_time', None) + + # Timeout is a placeholder in case some actions may linger too long + self.timeout = kwargs.get('timeout', cfg.CONF.default_action_timeout) + + # Return code, useful when action is not automatically deleted + # after execution + self.status = kwargs.get('status', self.INIT) + self.status_reason = kwargs.get('status_reason', '') + + # All parameters are passed in using keyword arguments which is + # a dictionary stored as JSON in DB + self.inputs = kwargs.get('inputs', {}) + self.outputs = kwargs.get('outputs', {}) + + self.created_at = kwargs.get('created_at', None) + self.updated_at = kwargs.get('updated_at', None) + + self.data = kwargs.get('data', {}) + + def store(self, context): + """Store the action record into database table. + + :param context: An instance of the request context. + :return: The ID of the stored object. + """ + + timestamp = timeutils.utcnow() + + values = { + 'name': self.name, + 'context': self.context.to_dict(), + 'target': self.target, + 'action': self.action, + 'cause': self.cause, + 'owner': self.owner, + 'start_time': self.start_time, + 'end_time': self.end_time, + 'timeout': self.timeout, + 'status': self.status, + 'status_reason': self.status_reason, + 'inputs': self.inputs, + 'outputs': self.outputs, + 'created_at': self.created_at, + 'updated_at': self.updated_at, + 'data': self.data, + } + + if self.id: + self.updated_at = timestamp + values['updated_at'] = timestamp + db_api.action_update(context, self.id, values) + else: + self.created_at = timestamp + values['created_at'] = timestamp + action = db_api.action_create(context, values) + self.id = action.id + + return self.id + + @classmethod + def _from_db_record(cls, record): + """Construct a action object from database record. + + :param context: the context used for DB operations; + :param record: a DB action object that contains all fields. + :return: An `Action` object deserialized from the DB action object. + """ + context = req_context.RequestContext.from_dict(record.context) + kwargs = { + 'id': record.id, + 'name': record.name, + 'cause': record.cause, + 'owner': record.owner, + 'start_time': record.start_time, + 'end_time': record.end_time, + 'timeout': record.timeout, + 'status': record.status, + 'status_reason': record.status_reason, + 'inputs': record.inputs or {}, + 'outputs': record.outputs or {}, + 'created_at': record.created_at, + 'updated_at': record.updated_at, + 'data': record.data, + } + + return cls(record.target, record.action, context, **kwargs) + + @classmethod + def load(cls, context, action_id=None, db_action=None): + """Retrieve an action from database. + + :param context: Instance of request context. + :param action_id: An UUID for the action to deserialize. + :param db_action: An action object for the action to deserialize. + :return: A `Action` object instance. + """ + if db_action is None: + db_action = db_api.action_get(context, action_id) + if db_action is None: + raise exception.ActionNotFound(action=action_id) + + return cls._from_db_record(db_action) + + @classmethod + def load_all(cls, context, filters=None, limit=None, marker=None, + sort_keys=None, sort_dir=None): + """Retrieve all actions from database.""" + + records = db_api.action_get_all(context, filters=filters, + limit=limit, marker=marker, + sort_keys=sort_keys, + sort_dir=sort_dir) + + for record in records: + yield cls._from_db_record(record) + + @classmethod + def create(cls, context, target, action, **kwargs): + """Create an action object. + + :param context: The requesting context. + :param target: The ID of the target. + :param action: Name of the action. + :param dict kwargs: Other keyword arguments for the action. + :return: ID of the action created. + """ + params = { + 'user': context.user, + 'project': context.project, + 'domain': context.domain, + 'is_admin': context.is_admin, + 'request_id': context.request_id, + 'trusts': context.trusts, + } + ctx = req_context.RequestContext.from_dict(params) + obj = cls(target, action, ctx, **kwargs) + return obj.store(context) + + @classmethod + def delete(cls, context, action_id): + """Delete an action from database.""" + db_api.action_delete(context, action_id) + + def signal(self, cmd): + '''Send a signal to the action.''' + if cmd not in self.COMMANDS: + return + + if cmd == self.SIG_CANCEL: + expected_statuses = (self.INIT, self.WAITING, self.READY, + self.RUNNING) + elif cmd == self.SIG_SUSPEND: + expected_statuses = (self.RUNNING) + else: # SIG_RESUME + expected_statuses = (self.SUSPENDED) + + if self.status not in expected_statuses: + msg = _LE("Action (%(action)s) is in unexpected status " + "(%(actual)s) while expected status should be one of " + "(%(expected)s).") % dict(action=self.id, + expected=expected_statuses, + actual=self.status) + LOG.error(msg) + return + + db_api.action_signal(self.context, self.id, cmd) + + def execute(self, **kwargs): + '''Execute the action. + + In theory, the action encapsulates all information needed for + execution. 'kwargs' may specify additional parameters. + :param kwargs: additional parameters that may override the default + properties stored in the action record. + ''' + return NotImplemented + + def set_status(self, result, reason=None): + """Set action status based on return value from execute.""" + + timestamp = wallclock() + + if result == self.RES_OK: + status = self.SUCCEEDED + db_api.action_mark_succeeded(self.context, self.id, timestamp) + + elif result == self.RES_ERROR: + status = self.FAILED + db_api.action_mark_failed(self.context, self.id, timestamp, + reason=reason or 'ERROR') + + elif result == self.RES_TIMEOUT: + status = self.FAILED + db_api.action_mark_failed(self.context, self.id, timestamp, + reason=reason or 'TIMEOUT') + + elif result == self.RES_CANCEL: + status = self.CANCELLED + db_api.action_mark_cancelled(self.context, self.id, timestamp) + + else: # result == self.RES_RETRY: + status = self.READY + # Action failed at the moment, but can be retried + # We abandon it and then notify other dispatchers to execute it + db_api.action_abandon(self.context, self.id) + + self.status = status + self.status_reason = reason + + def get_status(self): + timestamp = wallclock() + status = db_api.action_check_status(self.context, self.id, timestamp) + self.status = status + return status + + def is_timeout(self): + time_lapse = wallclock() - self.start_time + return time_lapse > self.timeout + + def _check_signal(self): + # Check timeout first, if true, return timeout message + if self.timeout is not None and self.is_timeout(): + return self.RES_TIMEOUT + + result = db_api.action_signal_query(self.context, self.id) + return result + + def is_cancelled(self): + return self._check_signal() == self.SIG_CANCEL + + def is_suspended(self): + return self._check_signal() == self.SIG_SUSPEND + + def is_resumed(self): + return self._check_signal() == self.SIG_RESUME + + def to_dict(self): + if self.id: + dep_on = db_api.dependency_get_depended(self.context, self.id) + dep_by = db_api.dependency_get_dependents(self.context, self.id) + else: + dep_on = [] + dep_by = [] + action_dict = { + 'id': self.id, + 'name': self.name, + 'action': self.action, + 'target': self.target, + 'cause': self.cause, + 'owner': self.owner, + 'start_time': self.start_time, + 'end_time': self.end_time, + 'timeout': self.timeout, + 'status': self.status, + 'status_reason': self.status_reason, + 'inputs': self.inputs, + 'outputs': self.outputs, + 'depends_on': dep_on, + 'depended_by': dep_by, + 'created_at': self.created_at, + 'updated_at': self.updated_at, + 'data': self.data, + } + return action_dict + + +def ActionProc(context, action_id): + '''Action process.''' + + action = Action.load(context, action_id=action_id) + if action is None: + LOG.error(_LE('Action "%s" could not be found.'), action_id) + return False + + reason = 'Action completed' + success = True + try: + result, reason = action.execute() + except Exception as ex: + result = action.RES_ERROR + reason = six.text_type(ex) + LOG.exception(_('Unexpected exception occurred during action ' + '%(action)s (%(id)s) execution: %(reason)s'), + {'action': action.action, 'id': action.id, + 'reason': reason}) + success = False + finally: + # NOTE: locks on action is eventually released here by status update + action.set_status(result, reason) + + return success diff --git a/bilean/engine/actions/user_action.py b/bilean/engine/actions/user_action.py new file mode 100644 index 0000000..fbce963 --- /dev/null +++ b/bilean/engine/actions/user_action.py @@ -0,0 +1,153 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import six + +from bilean.common import exception +from bilean.common.i18n import _ +from bilean.common.i18n import _LE +from bilean.common.i18n import _LI +from bilean.engine.actions import base +from bilean.engine.flows import flow as bilean_flow +from bilean.engine import lock as bilean_lock +from bilean.resources import base as resource_base + +from oslo_log import log as logging + +LOG = logging.getLogger(__name__) + + +class UserAction(base.Action): + """An action that can be performed on a user.""" + + ACTIONS = ( + USER_CREATE_RESOURCE, USER_UPDATE_RESOUCE, USER_DELETE_RESOURCE, + USER_SETTLE_ACCOUNT, + ) = ( + 'USER_CREATE_RESOURCE', 'USER_UPDATE_RESOUCE', 'USER_DELETE_RESOURCE', + 'USER_SETTLE_ACCOUNT', + ) + + def do_create_resource(self): + resource = resource_base.Resource.from_dict(self.inputs) + try: + flow_engine = bilean_flow.get_flow(self.context, + resource, + 'create') + with bilean_flow.DynamicLogListener(flow_engine, logger=LOG): + flow_engine.run() + except Exception as ex: + LOG.error(_LE("Faied to execute action(%(action_id)s), error: " + "%(error_msg)s"), {"action_id": self.id, + "error_msg": six.text_type(ex)}) + return self.RES_ERROR, _('Resource creation failed.') + + return self.RES_OK, _('Resource creation successfully.') + + def do_update_resource(self): + try: + resource_id = self.inputs.get('id') + resource = resource_base.Resource.load( + self.context, resource_id=resource_id) + except exception.ResourceNotFound: + LOG.error(_LE('The resource(%s) trying to update not found.'), + resource_id) + return self.RES_ERROR, _('Resource not found.') + + try: + flow_engine = bilean_flow.get_flow(self.context, + resource, + 'update') + with bilean_flow.DynamicLogListener(flow_engine, logger=LOG): + flow_engine.run() + except Exception as ex: + LOG.error(_LE("Faied to execute action(%(action_id)s), error: " + "%(error_msg)s"), {"action_id": self.id, + "error_msg": six.text_type(ex)}) + return self.RES_ERROR, _('Resource update failed.') + + LOG.info(_LI('Successfully updated resource: %s'), resource.id) + return self.RES_OK, _('Resource update successfully.') + + def do_delete_resource(self): + try: + resource_id = self.inputs.get('resource_id') + resource = resource_base.Resource.load( + self.context, resource_id=resource_id) + except exception.ResourceNotFound: + LOG.error(_LE('The resource(%s) trying to delete not found.'), + resource_id) + return self.RES_ERROR, _('Resource not found.') + + try: + flow_engine = bilean_flow.get_flow(self.context, + resource, + 'delete') + with bilean_flow.DynamicLogListener(flow_engine, logger=LOG): + flow_engine.run() + except Exception as ex: + LOG.error(_LE("Faied to execute action(%(action_id)s), error: " + "%(error_msg)s"), {"action_id": self.id, + "error_msg": six.text_type(ex)}) + return self.RES_ERROR, _('Resource deletion failed.') + + LOG.info(_LI('Successfully deleted resource: %s'), resource.id) + return self.RES_OK, _('Resource deletion successfully.') + + def do_settle_account(self): + try: + flow_engine = bilean_flow.get_settle_account_flow( + self.context, self.target, task=self.inputs.get('task')) + with bilean_flow.DynamicLogListener(flow_engine, logger=LOG): + flow_engine.run() + except Exception as ex: + LOG.error(_LE("Faied to execute action(%(action_id)s), error: " + "%(error_msg)s"), {"action_id": self.id, + "error_msg": six.text_type(ex)}) + return self.RES_ERROR, _('Settle account failed.') + + return self.RES_OK, _('Settle account successfully.') + + def _execute(self): + """Private function that finds out the handler and execute it.""" + + action_name = self.action.lower() + method_name = action_name.replace('user', 'do') + method = getattr(self, method_name, None) + + if method is None: + reason = _('Unsupported action: %s') % self.action + return self.RES_ERROR, reason + + return method() + + def execute(self, **kwargs): + """Interface function for action execution. + + :param dict kwargs: Parameters provided to the action, if any. + :returns: A tuple containing the result and the related reason. + """ + + try: + res = bilean_lock.user_lock_acquire(self.context, self.target, + self.id, self.owner) + if not res: + LOG.error(_LE('Failed grabbing the lock for user: %s'), + self.target) + res = self.RES_ERROR + reason = _('Failed in locking user') + else: + res, reason = self._execute() + finally: + bilean_lock.user_lock_release(self.target, self.id) + + return res, reason diff --git a/bilean/engine/dispatcher.py b/bilean/engine/dispatcher.py new file mode 100644 index 0000000..2f84fde --- /dev/null +++ b/bilean/engine/dispatcher.py @@ -0,0 +1,112 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from oslo_context import context as oslo_context +from oslo_log import log as logging +import oslo_messaging +from oslo_service import service + +from bilean.common import consts +from bilean.common.i18n import _LI +from bilean.common import messaging as rpc_messaging + +LOG = logging.getLogger(__name__) + +OPERATIONS = ( + START_ACTION, CANCEL_ACTION, STOP +) = ( + 'start_action', 'cancel_action', 'stop' +) + + +class Dispatcher(service.Service): + '''Listen on an AMQP queue named for the engine. + + Receive notification from engine services and schedule actions. + ''' + def __init__(self, engine_service, topic, version, thread_group_mgr): + super(Dispatcher, self).__init__() + self.TG = thread_group_mgr + self.engine_id = engine_service.engine_id + self.topic = topic + self.version = version + + def start(self): + super(Dispatcher, self).start() + self.target = oslo_messaging.Target(server=self.engine_id, + topic=self.topic, + version=self.version) + server = rpc_messaging.get_rpc_server(self.target, self) + server.start() + + def listening(self, ctxt): + '''Respond affirmatively to confirm that engine is still alive.''' + return True + + def start_action(self, ctxt, action_id=None): + self.TG.start_action(self.engine_id, action_id) + + def cancel_action(self, ctxt, action_id): + '''Cancel an action.''' + self.TG.cancel_action(action_id) + + def suspend_action(self, ctxt, action_id): + '''Suspend an action.''' + self.TG.suspend_action(action_id) + + def resume_action(self, ctxt, action_id): + '''Resume an action.''' + self.TG.resume_action(action_id) + + def stop(self): + super(Dispatcher, self).stop() + # Wait for all action threads to be finished + LOG.info(_LI("Stopping all action threads of engine %s"), + self.engine_id) + # Stop ThreadGroup gracefully + self.TG.stop(True) + LOG.info(_LI("All action threads have been finished")) + + +def notify(method, engine_id=None, **kwargs): + '''Send notification to dispatcher + + :param method: remote method to call + :param engine_id: dispatcher to notify; None implies broadcast + ''' + + client = rpc_messaging.get_rpc_client(version=consts.RPC_API_VERSION) + + if engine_id: + # Notify specific dispatcher identified by engine_id + call_context = client.prepare( + version=consts.RPC_API_VERSION, + topic=consts.ENGINE_DISPATCHER_TOPIC, + server=engine_id) + else: + # Broadcast to all disptachers + call_context = client.prepare( + version=consts.RPC_API_VERSION, + topic=consts.ENGINE_DISPATCHER_TOPIC) + + try: + # We don't use ctext parameter in action progress + # actually. But since RPCClient.call needs this param, + # we use oslo current context here. + call_context.call(oslo_context.get_current(), method, **kwargs) + return True + except oslo_messaging.MessagingTimeout: + return False + + +def start_action(engine_id=None, **kwargs): + return notify(START_ACTION, engine_id, **kwargs) diff --git a/bilean/engine/event.py b/bilean/engine/event.py index b11ce7c..3db4821 100644 --- a/bilean/engine/event.py +++ b/bilean/engine/event.py @@ -113,38 +113,56 @@ class Event(object): return evt -def record(context, user_id, action=None, seconds=0, value=0): +def record(context, user, timestamp=None, action='charge', cause_resource=None, + resource_action=None, extra_cost=0, value=0): """Generate events for specify user :param context: oslo.messaging.context - :param user_id: ID of user to mark event + :param user: object user to mark event :param action: action of event, include 'charge' and 'recharge' - :param seconds: use time length, needed when action is 'charge' + :param cause_resource: object resource which triggered the action + :param resource_action: action of resource + :param extra_cost: extra cost of the resource + :param timestamp: timestamp when event occurs :param value: value of recharge, needed when action is 'recharge' """ + if timestamp is None: + timestamp = timeutils.utcnow() try: if action == 'charge': resources = resource_base.Resource.load_all( - context, user_id=user_id, project_safe=False) - + context, user_id=user.id, project_safe=False) + seconds = (timestamp - user.last_bill).total_seconds() res_mapping = {} for resource in resources: - usage = resource.rate * seconds + if cause_resource and resource.id == cause_resource.id: + if resource_action == 'create': + usage = extra_cost + elif resource_action == 'update': + usage = resource.rate * seconds + extra_cost + else: + usage = resource.rate * seconds if res_mapping.get(resource.resource_type) is None: res_mapping[resource.resource_type] = usage else: res_mapping[resource.resource_type] += usage + if resource_action == 'delete': + usage = cause_resource.rate * seconds + extra_cost + if res_mapping.get(cause_resource.resource_type) is None: + res_mapping[cause_resource.resource_type] = 0 + res_mapping[cause_resource.resource_type] += usage + for res_type in res_mapping.keys(): - event = Event(timeutils.utcnow(), - user_id=user_id, + event = Event(timestamp, + user_id=user.id, action=action, resource_type=res_type, value=res_mapping.get(res_type)) event.store(context) elif action == 'recharge': - event = Event(timeutils.utcnow(), - user_id=user_id, + event = Event(timestamp, + user_id=user.id, action=action, value=value) event.store(context) diff --git a/bilean/engine/flows/__init__.py b/bilean/engine/flows/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/bilean/engine/flows/flow.py b/bilean/engine/flows/flow.py new file mode 100644 index 0000000..fc20018 --- /dev/null +++ b/bilean/engine/flows/flow.py @@ -0,0 +1,220 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import os + +from oslo_log import log as logging +import taskflow.engines +from taskflow.listeners import base +from taskflow.listeners import logging as logging_listener +from taskflow.patterns import linear_flow +from taskflow import task +from taskflow.types import failure as ft + +from bilean.common import exception +from bilean.common.i18n import _LE +from bilean.engine import policy as policy_mod +from bilean.engine import user as user_mod +from bilean.resources import base as resource_base +from bilean.rules import base as rule_base +from bilean import scheduler as bilean_scheduler + +LOG = logging.getLogger(__name__) + + +class DynamicLogListener(logging_listener.DynamicLoggingListener): + """This is used to attach to taskflow engines while they are running. + + It provides a bunch of useful features that expose the actions happening + inside a taskflow engine, which can be useful for developers for debugging, + for operations folks for monitoring and tracking of the resource actions + and more... + """ + + #: Exception is an excepted case, don't include traceback in log if fails. + _NO_TRACE_EXCEPTIONS = (exception.InvalidInput) + + def __init__(self, engine, + task_listen_for=base.DEFAULT_LISTEN_FOR, + flow_listen_for=base.DEFAULT_LISTEN_FOR, + retry_listen_for=base.DEFAULT_LISTEN_FOR, + logger=LOG): + super(DynamicLogListener, self).__init__( + engine, + task_listen_for=task_listen_for, + flow_listen_for=flow_listen_for, + retry_listen_for=retry_listen_for, + log=logger) + + def _format_failure(self, fail): + if fail.check(*self._NO_TRACE_EXCEPTIONS) is not None: + exc_info = None + exc_details = '%s%s' % (os.linesep, fail.pformat(traceback=False)) + return (exc_info, exc_details) + else: + return super(DynamicLogListener, self)._format_failure(fail) + + +class CreateResourceTask(task.Task): + """Create resource and store to db.""" + + def execute(self, context, resource, **kwargs): + user = user_mod.User.load(context, user_id=resource.user_id) + user_policy = policy_mod.Policy.load(context, policy_id=user.policy_id) + rule = user_policy.find_rule(context, resource.resource_type) + + # Update resource with rule_id and rate + resource.rule_id = rule.id + resource.rate = rule.get_price(resource) + resource.store(context) + + def revert(self, context, resource, result, **kwargs): + if isinstance(result, ft.Failure): + LOG.error(_LE("Error when creating resource: %s"), + resource.to_dict()) + return + + resource.delete(context, soft_delete=False) + + +class UpdateResourceTask(task.Task): + """Update resource.""" + + def execute(self, context, resource, values, resource_bak, **kwargs): + old_rate = resource.rate + resource.properties = values.get('properties') + rule = rule_base.Rule.load(context, rule_id=resource.rule_id) + resource.rate = rule.get_price(resource) + resource.d_rate = resource.rate - old_rate + resource.store(context) + + def revert(self, context, resource, resource_bak, result, **kwargs): + if isinstance(result, ft.Failure): + LOG.error(_LE("Error when updating resource: %s"), resource.id) + return + + # restore resource + res = resource_base.Resource.from_dict(resource_bak) + res.store(context) + + +class DeleteResourceTask(task.Task): + """Delete resource from db.""" + + def execute(self, context, resource, **kwargs): + resource.delete(context) + + def revert(self, context, resource, result, **kwargs): + if isinstance(result, ft.Failure): + LOG.error(_LE("Error when deleting resource: %s"), resource.id) + return + + resource.deleted_at = None + resource.store(context) + + +class LoadUserTask(task.Task): + """Load user from db.""" + + default_provides = set(['user_bak', 'user_obj']) + + def execute(self, context, user_id, **kwargs): + user_obj = user_mod.User.load(context, user_id=user_id) + return { + 'user_bak': user_obj.to_dict(), + 'user_obj': user_obj, + } + + +class SettleAccountTask(task.Task): + def execute(self, context, user_obj, user_bak, task, **kwargs): + user_obj.settle_account(context, task=task) + + def revert(self, context, user_bak, result, **kwargs): + if isinstance(result, ft.Failure): + LOG.error(_LE("Error when settling account for user: %s"), + user_bak.get('id')) + return + + # Restore user + user = user_mod.User.from_dict(user_bak) + user.store(context) + + +class UpdateUserWithResourceTask(task.Task): + """Update user with resource actions.""" + + def execute(self, context, user_obj, user_bak, resource, + resource_action, **kwargs): + user_obj.update_with_resource(context, resource, + resource_action=resource_action) + + def revert(self, context, user_bak, result, **kwargs): + if isinstance(result, ft.Failure): + LOG.error(_LE("Error when updating user: %s"), user_bak.get('id')) + return + + # Restore user + user = user_mod.User.from_dict(user_bak) + user.store(context) + + +class UpdateUserJobsTask(task.Task): + """Update user jobs.""" + + def execute(self, user_obj, **kwargs): + res = bilean_scheduler.notify(bilean_scheduler.UPDATE_JOBS, + user=user_obj.to_dict()) + if not res: + LOG.error(_LE("Error when updating user jobs: %s"), user_obj.id) + raise + + +def get_settle_account_flow(context, user_id, task=None): + """Constructs and returns settle account task flow.""" + + flow_name = user_id + '_settle_account' + flow = linear_flow.Flow(flow_name) + kwargs = { + 'context': context, + 'user_id': user_id, + 'task': task, + } + flow.add(LoadUserTask(), + SettleAccountTask()) + if task != 'freeze': + flow.add(UpdateUserJobsTask()) + return taskflow.engines.load(flow, store=kwargs) + + +def get_flow(context, resource, resource_action): + """Constructs and returns resource task flow.""" + + flow_name = resource.user_id + '_' + resource_action + '_resource' + flow = linear_flow.Flow(flow_name) + kwargs = { + 'context': context, + 'user_id': resource.user_id, + 'resource': resource, + 'resource_action': resource_action, + } + if resource_action == 'create': + flow.add(CreateResourceTask()) + if resource_action == 'update': + flow.add(UpdateResourceTask()) + kwargs['resource_bak'] = resource.to_dict() + elif resource_action == 'delete': + flow.add(DeleteResourceTask()) + flow.add(LoadUserTask(), + UpdateUserWithResourceTask(), + UpdateUserJobsTask()) + return taskflow.engines.load(flow, store=kwargs) diff --git a/bilean/engine/lock.py b/bilean/engine/lock.py index fc0917f..40c9ee9 100644 --- a/bilean/engine/lock.py +++ b/bilean/engine/lock.py @@ -13,8 +13,12 @@ import eventlet from oslo_config import cfg from oslo_log import log as logging +from oslo_utils import timeutils +import time from bilean.common.i18n import _ +from bilean.common.i18n import _LE +from bilean.common.i18n import _LI from bilean.db import api as db_api CONF = cfg.CONF @@ -25,22 +29,40 @@ CONF.import_opt('lock_retry_interval', 'bilean.common.config') LOG = logging.getLogger(__name__) +def is_engine_dead(ctx, engine_id, period_time=None): + # if engine didn't report its status for peirod_time, will consider it + # as a dead engine. + if period_time is None: + period_time = 2 * CONF.periodic_interval + engine = db_api.service_get(ctx, engine_id) + if not engine: + return True + if (timeutils.utcnow() - engine.updated_at).total_seconds() > period_time: + return True + return False + + def sleep(sleep_time): '''Interface for sleeping.''' eventlet.sleep(sleep_time) -def user_lock_acquire(user_id, engine_id): +def user_lock_acquire(context, user_id, action_id, engine=None, + forced=False): """Try to lock the specified user. + :param context: the context used for DB operations; :param user_id: ID of the user to be locked. - :param engine_id: ID of the engine which wants to lock the user. + :param action_id: ID of the action that attempts to lock the user. + :param engine: ID of the engine that attempts to lock the user. + :param forced: set to True to cancel current action that owns the lock, + if any. :returns: True if lock is acquired, or False otherwise. """ - user_lock = db_api.user_lock_acquire(user_id, engine_id) - if user_lock: + owner = db_api.user_lock_acquire(user_id, action_id) + if action_id == owner: return True retries = cfg.CONF.lock_retry_times @@ -49,18 +71,40 @@ def user_lock_acquire(user_id, engine_id): while retries > 0: sleep(retry_interval) LOG.debug(_('Acquire lock for user %s again'), user_id) - user_lock = db_api.user_lock_acquire(user_id, engine_id) - if user_lock: + owner = db_api.user_lock_acquire(user_id, action_id) + if action_id == owner: return True retries = retries - 1 + if forced: + owner = db_api.user_lock_steal(user_id, action_id) + return action_id == owner + + action = db_api.action_get(context, owner) + if (action and action.owner and action.owner != engine and + is_engine_dead(context, action.owner)): + LOG.info(_LI('The user %(u)s is locked by dead action %(a)s, ' + 'try to steal the lock.'), { + 'u': user_id, + 'a': owner + }) + reason = _('Engine died when executing this action.') + db_api.action_mark_failed(context, action.id, time.time(), + reason=reason) + db_api.user_lock_steal(user_id, action_id) + return True + + LOG.error(_LE('User is already locked by action %(old)s, ' + 'action %(new)s failed grabbing the lock'), + {'old': owner, 'new': action_id}) + return False -def user_lock_release(user_id, engine_id=None): +def user_lock_release(user_id, action_id): """Release the lock on the specified user. :param user_id: ID of the user to be released. - :param engine_id: ID of the engine which locked the user. + :param action_id: ID of the action which locked the user. """ - return db_api.user_lock_release(user_id, engine_id=engine_id) + return db_api.user_lock_release(user_id, action_id) diff --git a/bilean/engine/service.py b/bilean/engine/service.py index 78b1d71..7d33959 100644 --- a/bilean/engine/service.py +++ b/bilean/engine/service.py @@ -13,11 +13,14 @@ import functools import six -import socket +import time +import eventlet +from oslo_config import cfg from oslo_log import log as logging import oslo_messaging from oslo_service import service +from oslo_service import threadgroup from bilean.common import consts from bilean.common import context as bilean_context @@ -28,9 +31,11 @@ from bilean.common.i18n import _LI from bilean.common import messaging as rpc_messaging from bilean.common import schema from bilean.common import utils +from bilean.db import api as db_api +from bilean.engine.actions import base as action_mod +from bilean.engine import dispatcher from bilean.engine import environment from bilean.engine import event as event_mod -from bilean.engine import lock as bilean_lock from bilean.engine import policy as policy_mod from bilean.engine import user as user_mod from bilean.resources import base as resource_base @@ -53,6 +58,112 @@ def request_context(func): return wrapped +class ThreadGroupManager(object): + '''Thread group manager.''' + + def __init__(self): + super(ThreadGroupManager, self).__init__() + self.workers = {} + self.group = threadgroup.ThreadGroup() + + # Create dummy service task, because when there is nothing queued + # on self.tg the process exits + self.add_timer(cfg.CONF.periodic_interval, self._service_task) + + self.db_session = bilean_context.get_admin_context() + + def _service_task(self): + '''Dummy task which gets queued on the service.Service threadgroup. + + Without this service.Service sees nothing running i.e has nothing to + wait() on, so the process exits. + + ''' + pass + + def start(self, func, *args, **kwargs): + '''Run the given method in a thread.''' + + return self.group.add_thread(func, *args, **kwargs) + + def start_action(self, worker_id, action_id=None): + '''Run the given action in a sub-thread. + + Release the action lock when the thread finishes? + + :param workder_id: ID of the worker thread; we fake workers using + bilean engines at the moment. + :param action_id: ID of the action to be executed. None means the + 1st ready action will be scheduled to run. + ''' + def release(thread, action_id): + '''Callback function that will be passed to GreenThread.link().''' + # Remove action thread from thread list + self.workers.pop(action_id) + + timestamp = time.time() + if action_id is not None: + action = db_api.action_acquire(self.db_session, action_id, + worker_id, timestamp) + else: + action = db_api.action_acquire_first_ready(self.db_session, + worker_id, + timestamp) + if not action: + return + + th = self.start(action_mod.ActionProc, self.db_session, action.id) + self.workers[action.id] = th + th.link(release, action.id) + return th + + def cancel_action(self, action_id): + '''Cancel an action execution progress.''' + action = action_mod.Action.load(self.db_session, action_id) + action.signal(action.SIG_CANCEL) + + def suspend_action(self, action_id): + '''Suspend an action execution progress.''' + action = action_mod.Action.load(self.db_session, action_id) + action.signal(action.SIG_SUSPEND) + + def resume_action(self, action_id): + '''Resume an action execution progress.''' + action = action_mod.Action.load(self.db_session, action_id) + action.signal(action.SIG_RESUME) + + def add_timer(self, interval, func, *args, **kwargs): + '''Define a periodic task to be run in the thread group. + + The task will be executed in a separate green thread. + Interval is from cfg.CONF.periodic_interval + ''' + + self.group.add_timer(interval, func, *args, **kwargs) + + def stop_timers(self): + self.group.stop_timers() + + def stop(self, graceful=False): + '''Stop any active threads belong to this threadgroup.''' + # Try to stop all threads gracefully + self.group.stop(graceful) + self.group.wait() + + # Wait for link()ed functions (i.e. lock release) + threads = self.group.threads[:] + links_done = dict((th, False) for th in threads) + + def mark_done(gt, th): + links_done[th] = True + + for th in threads: + th.link(mark_done, th) + + while not all(links_done.values()): + eventlet.sleep() + + class EngineService(service.Service): """Manages the running instances from creation to destruction. @@ -68,13 +179,36 @@ class EngineService(service.Service): super(EngineService, self).__init__() self.host = host self.topic = topic + self.dispatcher_topic = consts.ENGINE_DISPATCHER_TOPIC self.engine_id = None + self.TG = None self.target = None self._rpc_server = None + def _init_service(self): + admin_context = bilean_context.get_admin_context() + srv = db_api.service_get_by_host_and_binary(admin_context, + self.host, + 'bilean-engine') + if srv is None: + srv = db_api.service_create(admin_context, + host=self.host, + binary='bilean-engine', + topic=self.topic) + self.engine_id = srv.id + def start(self): - self.engine_id = socket.gethostname() + self._init_service() + self.TG = ThreadGroupManager() + + # create a dispatcher RPC service for this engine. + self.dispatcher = dispatcher.Dispatcher(self, + self.dispatcher_topic, + consts.RPC_API_VERSION, + self.TG) + LOG.info(_LI("Starting dispatcher for engine %s"), self.engine_id) + self.dispatcher.start() LOG.info(_LI("Starting rpc server for engine: %s"), self.engine_id) target = oslo_messaging.Target(version=consts.RPC_API_VERSION, @@ -84,6 +218,9 @@ class EngineService(service.Service): self._rpc_server = rpc_messaging.get_rpc_server(target, self) self._rpc_server.start() + self.TG.add_timer(cfg.CONF.periodic_interval, + self.service_manage_report) + super(EngineService, self).start() def _stop_rpc_server(self): @@ -99,8 +236,22 @@ class EngineService(service.Service): def stop(self): self._stop_rpc_server() + + # Notify dispatcher to stop all action threads it started. + LOG.info(_LI("Stopping dispatcher for engine %s"), self.engine_id) + self.dispatcher.stop() + + self.TG.stop() super(EngineService, self).stop() + def service_manage_report(self): + admin_context = bilean_context.get_admin_context() + try: + db_api.service_update(admin_context, self.engine_id) + except Exception as ex: + LOG.error(_LE('Service %(id)s update failed: %(error)s'), + {'id': self.engine_id, 'error': six.text_type(ex)}) + @request_context def user_list(self, cnxt, show_deleted=False, limit=None, marker=None, sort_keys=None, sort_dir=None, @@ -136,19 +287,12 @@ class EngineService(service.Service): @request_context def user_recharge(self, cnxt, user_id, value): """Do recharge for specify user.""" - res = bilean_lock.user_lock_acquire(user_id, self.engine_id) - if not res: - LOG.error(_LE('Failed grabbing the lock for user %s'), res.user_id) - return False - try: - user = user_mod.User.load(cnxt, user_id=user_id) - user.do_recharge(cnxt, value) - # As user has been updated, the billing job for the user - # should to be updated too. - bilean_scheduler.notify(bilean_scheduler.UPDATE_JOBS, - user=user.to_dict()) - finally: - bilean_lock.user_lock_release(user_id, engine_id=self.engine_id) + user = user_mod.User.load(cnxt, user_id=user_id) + user.do_recharge(cnxt, value) + # As user has been updated, the billing job for the user + # should to be updated too. + bilean_scheduler.notify(bilean_scheduler.UPDATE_JOBS, + user=user.to_dict()) return user.to_dict() @@ -176,14 +320,8 @@ class EngineService(service.Service): 'policy': policy_id} raise exception.BileanBadRequest(msg=msg) - res = bilean_lock.user_lock_acquire(user_id, self.engine_id) - if not res: - LOG.error(_LE('Failed grabbing the lock for user %s'), res.user_id) - return False user.policy_id = policy_id user.store(cnxt) - bilean_lock.user_lock_release(user_id, engine_id=self.engine_id) - return user.to_dict() @request_context @@ -274,44 +412,23 @@ class EngineService(service.Service): def resource_create(self, cnxt, resource_id, user_id, resource_type, properties): - """Create resource by given database + """Create resource by given data.""" - Cause new resource would update user's rate, user update and billing - would be done. - - """ resource = resource_base.Resource(resource_id, user_id, resource_type, properties) - # Find the exact rule of resource - admin_context = bilean_context.get_admin_context() - user = user_mod.User.load(admin_context, user_id=user_id) - user_policy = policy_mod.Policy.load( - admin_context, policy_id=user.policy_id) - rule = user_policy.find_rule(admin_context, resource_type) - # Update resource with rule_id and rate - resource.rule_id = rule.id - resource.rate = rule.get_price(resource) + params = { + 'name': 'create_resource_%s' % resource_id, + 'cause': action_mod.CAUSE_RPC, + 'status': action_mod.Action.READY, + 'inputs': resource.to_dict(), + } - # Update user with resource - res = bilean_lock.user_lock_acquire(user.id, self.engine_id) - if not res: - LOG.error(_LE('Failed grabbing the lock for user %s'), user.id) - return - try: - # Reload user to ensure the info is latest. - user = user_mod.User.load(admin_context, user_id=user_id) - user.update_with_resource(admin_context, resource) - resource.store(admin_context) - - # As the rate of user has changed, the billing job for the user - # should change too. - bilean_scheduler.notify(bilean_scheduler.UPDATE_JOBS, - user=user.to_dict()) - finally: - bilean_lock.user_lock_release(user.id, engine_id=self.engine_id) - - return resource.to_dict() + action_id = action_mod.Action.create(cnxt, user_id, + consts.USER_CREATE_RESOURCE, + **params) + dispatcher.start_action(action_id=action_id) + LOG.info(_LI('Resource create action queued: %s'), action_id) @request_context def resource_list(self, cnxt, user_id=None, limit=None, marker=None, @@ -336,59 +453,44 @@ class EngineService(service.Service): resource = resource_base.Resource.load(cnxt, resource_id=resource_id) return resource.to_dict() - def resource_update(self, cnxt, resource): + def resource_update(self, cnxt, user_id, resource): """Do resource update.""" - admin_context = bilean_context.get_admin_context() - res = resource_base.Resource.load( - admin_context, resource_id=resource['id']) - old_rate = res.rate - res.properties = resource['properties'] - rule = rule_base.Rule.load(admin_context, rule_id=res.rule_id) - res.rate = rule.get_price(res) - res.d_rate = res.rate - old_rate - result = bilean_lock.user_lock_acquire(res.user_id, self.engine_id) - if not result: - LOG.error(_LE('Failed grabbing the lock for user %s'), res.user_id) - return False + params = { + 'name': 'update_resource_%s' % resource.get('id'), + 'cause': action_mod.CAUSE_RPC, + 'status': action_mod.Action.READY, + 'inputs': resource, + } + + action_id = action_mod.Action.create(cnxt, user_id, + consts.USER_UPDATE_RESOURCE, + **params) + dispatcher.start_action(action_id=action_id) + LOG.info(_LI('Resource update action queued: %s'), action_id) + + def resource_delete(self, cnxt, user_id, resource_id): + """Delete a specific resource""" + try: - user = user_mod.User.load(admin_context, res.user_id) - user.update_with_resource(admin_context, res, action='update') - - res.store(admin_context) - - bilean_scheduler.notify(bilean_scheduler.UPDATE_JOBS, - user=user.to_dict()) - finally: - bilean_lock.user_lock_release(user.id, engine_id=self.engine_id) - - return True - - def resource_delete(self, cnxt, resource_id): - """Do resource delete""" - admin_context = bilean_context.get_admin_context() - try: - res = resource_base.Resource.load( - admin_context, resource_id=resource_id) + resource_base.Resource.load(cnxt, resource_id=resource_id) except exception.ResourceNotFound: - return False + LOG.error(_LE('The resource(%s) trying to delete not found.'), + resource_id) + return - result = bilean_lock.user_lock_acquire(res.user_id, self.engine_id) - if not result: - LOG.error(_LE('Failed grabbing the lock for user %s'), res.user_id) - return False - try: - user = user_mod.User.load(admin_context, user_id=res.user_id) - user.update_with_resource(admin_context, res, action='delete') + params = { + 'name': 'delete_resource_%s' % resource_id, + 'cause': action_mod.CAUSE_RPC, + 'status': action_mod.Action.READY, + 'inputs': {'resource_id': resource_id}, + } - bilean_scheduler.notify(bilean_scheduler.UPDATE_JOBS, - user=user.to_dict()) - - res.delete(admin_context) - finally: - bilean_lock.user_lock_release(user.id, engine_id=self.engine_id) - - return True + action_id = action_mod.Action.create(cnxt, user_id, + consts.USER_DELETE_RESOURCE, + **params) + dispatcher.start_action(action_id=action_id) + LOG.info(_LI('Resource delete action queued: %s'), action_id) @request_context def event_list(self, cnxt, user_id=None, limit=None, marker=None, @@ -553,14 +655,16 @@ class EngineService(service.Service): policy_mod.Policy.delete(cnxt, policy_id) def settle_account(self, cnxt, user_id, task=None): - res = bilean_lock.user_lock_acquire(user_id, self.engine_id) - if not res: - LOG.error(_LE('Failed grabbing the lock for user %s'), user_id) - return - try: - user = user_mod.User.load(cnxt, user_id=user_id) - user.settle_account(cnxt, task=task) - finally: - bilean_lock.user_lock_release(user_id, engine_id=self.engine_id) + params = { + 'name': 'settle_account_%s' % user_id, + 'cause': action_mod.CAUSE_RPC, + 'status': action_mod.Action.READY, + 'inputs': {'task': task}, + } - return user.to_dict() + action_id = action_mod.Action.create(cnxt, user_id, + consts.USER_SETTLE_ACCOUNT, + **params) + self.TG.start_action(self.engine_id, action_id=action_id) + + LOG.info(_LI('User settle_account action queued: %s'), action_id) diff --git a/bilean/engine/user.py b/bilean/engine/user.py index d385d60..ea3388a 100644 --- a/bilean/engine/user.py +++ b/bilean/engine/user.py @@ -169,7 +169,8 @@ class User(object): @classmethod def from_dict(cls, values): - return cls(values.get('id'), **values) + id = values.pop('id', None) + return cls(id, **values) def to_dict(self): user_dict = { @@ -194,17 +195,37 @@ class User(object): self.status_reason = reason self.store(context) - def update_with_resource(self, context, resource, action='create'): + def update_with_resource(self, context, resource, + resource_action='create'): '''Update user with resource''' - self._settle_account(context) - if 'create' == action: + now = timeutils.utcnow() + extra_cost = 0 + if 'create' == resource_action: d_rate = resource.rate - elif 'delete' == action: + if resource.properties.get('created_at') is not None: + created_at = timeutils.parse_strtime( + resource.properties.get('created_at')) + extra_seconds = (now - created_at).total_seconds() + extra_cost = d_rate * extra_seconds + elif 'delete' == resource_action: d_rate = -resource.rate - elif 'update' == action: + if resource.properties.get('deleted_at') is not None: + deleted_at = timeutils.parse_strtime( + resource.properties.get('deleted_at')) + extra_seconds = (now - deleted_at).total_seconds() + extra_cost = d_rate * extra_seconds + elif 'update' == resource_action: d_rate = resource.d_rate + if resource.properties.get('updated_at') is not None: + updated_at = timeutils.parse_strtime( + resource.properties.get('updated_at')) + extra_seconds = (now - updated_at).total_seconds() + extra_cost = d_rate * extra_seconds + self._settle_account(context, extra_cost=extra_cost, + cause_resource=resource, + resource_action=resource_action) self._change_user_rate(context, d_rate) self.store(context) @@ -214,20 +235,18 @@ class User(object): new_rate = old_rate + d_rate if old_rate == 0 and new_rate > 0: self.last_bill = timeutils.utcnow() - if d_rate > 0 and self.status == self.FREE: self.status = self.ACTIVE elif d_rate < 0: if new_rate == 0 and self.balance >= 0: + reason = _("Status change to 'FREE' because of resource " + "deleting.") self.status = self.FREE - elif new_rate == 0 and self.balance < 0: - self.status = self.FREEZE - self.satus_reason = "Balance overdraft" - elif self.status == self.WARNING: - if not self.notify_or_not(): - reason = _("Status change from 'warning' to 'active' " - "because of resource deleting.") - self.status = self.ACTIVE - self.status_reason = reason + self.status_reason = reason + elif self.status == self.WARNING and not self.notify_or_not(): + reason = _("Status change from 'WARNING' to 'ACTIVE' " + "because of resource deleting.") + self.status = self.ACTIVE + self.status_reason = reason self.rate = new_rate def do_recharge(self, context, value): @@ -240,19 +259,19 @@ class User(object): self.status = self.FREE self.status_reason = "Recharged" elif self.status == self.FREEZE and self.balance > 0: - reason = _("Status change from 'freeze' to 'free' because " + reason = _("Status change from 'FREEZE' to 'FREE' because " "of recharge.") self.status = self.FREE self.status_reason = reason elif self.status == self.WARNING: if not self.notify_or_not(): - reason = _("Status change from 'warning' to 'active' because " + reason = _("Status change from 'WARNING' to 'ACTIVE' because " "of recharge.") self.status = self.ACTIVE self.status_reason = reason self.store(context) - event_mod.record(context, self.id, action='recharge', value=value) + event_mod.record(context, self, action='recharge', value=value) def notify_or_not(self): '''Check if user should be notified.''' @@ -269,33 +288,25 @@ class User(object): db_api.user_delete(context, self.id) return True - def _settle_account(self, context): + def _settle_account(self, context, cause_resource=None, + resource_action=None, extra_cost=0): if self.status not in [self.ACTIVE, self.WARNING]: LOG.info(_LI("Ignore settlement action because user is in '%s' " "status."), self.status) return - now = timeutils.utcnow() total_seconds = (now - self.last_bill).total_seconds() - cost = self.rate * total_seconds - if cost > 0: - self.balance -= cost - self.last_bill = now - event_mod.record(context, self.id, action='charge', - seconds=total_seconds) - - def _freeze(self, context, reason=None): - '''Freeze user when balance overdraft.''' - LOG.info(_LI("Freeze user %(user_id)s, reason: %(reason)s"), - {'user_id': self.id, 'reason': reason}) - resources = resource_base.Resource.load_all( - context, user_id=self.id, project_safe=False) - for resource in resources: - if resource.do_delete(context): - self._change_user_rate(context, -resource.rate) + cost = self.rate * total_seconds + extra_cost + self.balance -= cost + event_mod.record(context, self, timestamp=now, + cause_resource=cause_resource, + resource_action=resource_action, + extra_cost=extra_cost) + self.last_bill = now def settle_account(self, context, task=None): '''Settle account for user.''' + notifier = bilean_notifier.Notifier() self._settle_account(context) @@ -306,7 +317,17 @@ class User(object): msg = {'user': self.id, 'notification': self.status_reason} notifier.info('billing.notify', msg) elif task == 'freeze' and self.balance <= 0: - self._freeze(context, reason="Balance overdraft") + reason = _("Balance overdraft") + LOG.info(_LI("Freeze user %(user_id)s, reason: %(reason)s"), + {'user_id': self.id, 'reason': reason}) + resources = resource_base.Resource.load_all( + context, user_id=self.id, project_safe=False) + for resource in resources: + resource.do_delete(context) + self.rate = 0 + self.status = self.FREEZE + self.status_reason = reason + # Notify user msg = {'user': self.id, 'notification': self.status_reason} notifier.info('billing.notify', msg) diff --git a/bilean/notification/action.py b/bilean/notification/action.py index ffbdf18..85c374c 100644 --- a/bilean/notification/action.py +++ b/bilean/notification/action.py @@ -86,11 +86,15 @@ class ResourceAction(Action): def do_update(self): """Update a resource""" - return self.rpc_client.resource_update(self.cnxt, self.data) + return self.rpc_client.resource_update(self.cnxt, + self.data.pop('user_id'), + self.data) def do_delete(self): """Delete a resource""" - return self.rpc_client.resource_delete(self.cnxt, self.id) + return self.rpc_client.resource_delete(self.cnxt, + self.user_id, + self.id) class UserAction(Action): diff --git a/bilean/resources/base.py b/bilean/resources/base.py index 17fe0b8..1e3e6d5 100644 --- a/bilean/resources/base.py +++ b/bilean/resources/base.py @@ -71,9 +71,9 @@ class Resource(object): return self.id - def delete(self, context): + def delete(self, context, soft_delete=True): '''Delete resource from db.''' - db_api.resource_delete(context, self.id) + db_api.resource_delete(context, self.id, soft_delete=soft_delete) @classmethod def _from_db_record(cls, record): @@ -123,6 +123,14 @@ class Resource(object): return [cls._from_db_record(record) for record in records] + @classmethod + def from_dict(cls, values): + id = values.pop('id', None) + user_id = values.pop('user_id', None) + resource_type = values.pop('resource_type', None) + properties = values.pop('properties', {}) + return cls(id, user_id, resource_type, properties, **values) + def to_dict(self): resource_dict = { 'id': self.id, diff --git a/bilean/rpc/client.py b/bilean/rpc/client.py index 1184416..f2b1b6a 100644 --- a/bilean/rpc/client.py +++ b/bilean/rpc/client.py @@ -140,12 +140,14 @@ class EngineClient(object): resource_type=resource_type, properties=properties)) - def resource_update(self, ctxt, resource): + def resource_update(self, ctxt, user_id, resource): return self.call(ctxt, self.make_msg('resource_update', + user_id=user_id, resource=resource)) - def resource_delete(self, ctxt, resource_id): + def resource_delete(self, ctxt, user_id, resource_id): return self.call(ctxt, self.make_msg('resource_delete', + user_id=user_id, resource_id=resource_id)) # events diff --git a/bilean/scheduler/cron_scheduler.py b/bilean/scheduler/cron_scheduler.py index bef0e7b..6eccf6b 100644 --- a/bilean/scheduler/cron_scheduler.py +++ b/bilean/scheduler/cron_scheduler.py @@ -94,13 +94,9 @@ class CronScheduler(object): for job in jobs: if self._is_exist(job.id): continue - task_name = "_%s_task" % (job.job_type) - task = getattr(self, task_name) LOG.info(_LI("Add job '%(job_id)s' to scheduler '%(id)s'."), {'job_id': job.id, 'id': self.scheduler_id}) - tg_type = self.CRON if job.job_type == self.DAILY else self.DAILY - self._add_job(task, job.id, trigger_type=tg_type, - params=job.parameters) + self._add_job(job.id, job.job_type, **job.parameters) LOG.info(_LI("Initialise users from keystone.")) users = user_mod.User.init_users(admin_context) @@ -113,7 +109,7 @@ class CronScheduler(object): continue self._add_daily_job(user) - def _add_job(self, task, job_id, trigger_type='date', **kwargs): + def _add_job(self, job_id, task_type, **kwargs): """Add a job to scheduler by given data. :param str|unicode user_id: used as job_id @@ -123,15 +119,17 @@ class CronScheduler(object): mg_time = cfg.CONF.scheduler.misfire_grace_time job_time_zone = cfg.CONF.scheduler.time_zone user_id = job_id.split('-')[1] - if trigger_type == 'date': + trigger_type = self.CRON if task_type == self.DAILY else self.DATE + + if trigger_type == self.DATE: run_date = kwargs.get('run_date') if run_date is None: msg = "Param run_date cannot be None for trigger type 'date'." raise exception.InvalidInput(reason=msg) - self._scheduler.add_job(task, 'date', + self._scheduler.add_job(self._task, 'date', timezone=job_time_zone, run_date=run_date, - args=[user_id], + args=[user_id, task_type], id=job_id, misfire_grace_time=mg_time) return @@ -141,24 +139,14 @@ class CronScheduler(object): minute = kwargs.get('minute') if not hour or not minute: hour, minute = self._generate_timer() - self._scheduler.add_job(task, 'cron', + self._scheduler.add_job(self._task, 'cron', timezone=job_time_zone, hour=hour, minute=minute, - args=[user_id], + args=[user_id, task_type], id=job_id, misfire_grace_time=mg_time) - def _modify_job(self, job_id, **changes): - """Modifies the properties of a single job. - - Modifications are passed to this method as extra keyword arguments. - - :param str|unicode job_id: the identifier of the job - """ - - self._scheduler.modify_job(job_id, **changes) - def _remove_job(self, job_id): """Removes a job, preventing it from being run any more. @@ -177,40 +165,19 @@ class CronScheduler(object): job = self._scheduler.get_job(job_id) return job is not None - def _notify_task(self, user_id): + def _task(self, user_id, task_type): admin_context = bilean_context.get_admin_context() - user = self.rpc_client.settle_account( - admin_context, user_id, task=self.NOTIFY) - user_obj = user_mod.User.from_dict(user) - try: - db_api.job_delete( - admin_context, self._generate_job_id(user_id, self.NOTIFY)) - except exception.NotFound as e: - LOG.warn(_("Failed in deleting job: %s") % six.text_type(e)) - - self.update_jobs(user_obj) - - def _daily_task(self, user_id): - admin_context = bilean_context.get_admin_context() - user = self.rpc_client.settle_account( - admin_context, user_id, task=self.DAILY) - user_obj = user_mod.User.from_dict(user) - self.update_jobs(user_obj) - - def _freeze_task(self, user_id): - admin_context = bilean_context.get_admin_context() - user = self.rpc_client.settle_account( - admin_context, user_id, task=self.FREEZE) - user_obj = user_mod.User.from_dict(user) - try: - db_api.job_delete( - admin_context, self._generate_job_id(user_id, self.FREEZE)) - except exception.NotFound as e: - LOG.warn(_("Failed in deleting job: %s") % six.text_type(e)) - self.update_jobs(user_obj) + self.rpc_client.settle_account( + admin_context, user_id, task=task_type) + if task_type != self.DAILY: + try: + db_api.job_delete( + admin_context, self._generate_job_id(user_id, task_type)) + except exception.NotFound as e: + LOG.warn(_("Failed in deleting job: %s") % six.text_type(e)) def _add_notify_job(self, user): - if not user.rate: + if user.rate == 0: return False total_seconds = user.balance / user.rate prior_notify_time = cfg.CONF.scheduler.prior_notify_time * 3600 @@ -219,7 +186,7 @@ class CronScheduler(object): run_date = timeutils.utcnow() + timedelta(seconds=notify_seconds) job_params = {'run_date': run_date} job_id = self._generate_job_id(user.id, self.NOTIFY) - self._add_job(self._notify_task, job_id, **job_params) + self._add_job(job_id, self.NOTIFY, **job_params) # Save job to database job = {'id': job_id, 'job_type': self.NOTIFY, @@ -229,13 +196,14 @@ class CronScheduler(object): db_api.job_create(admin_context, job) def _add_freeze_job(self, user): - if not user.rate: + if user.rate == 0: return False total_seconds = user.balance / user.rate + LOG.info(_LI("###########Fuck user: %s"), user.to_dict()) run_date = timeutils.utcnow() + timedelta(seconds=total_seconds) job_params = {'run_date': run_date} job_id = self._generate_job_id(user.id, self.FREEZE) - self._add_job(self._freeze_task, job_id, **job_params) + self._add_job(job_id, self.FREEZE, **job_params) # Save job to database job = {'id': job_id, 'job_type': self.FREEZE, @@ -249,15 +217,7 @@ class CronScheduler(object): job_id = self._generate_job_id(user.id, self.DAILY) job_params = {'hour': random.randint(0, 23), 'minute': random.randint(0, 59)} - self._add_job(self._daily_task, job_id, - trigger_type='cron', **job_params) - # Save job to database - job = {'id': job_id, - 'job_type': self.DAILY, - 'scheduler_id': self.scheduler_id, - 'parameters': job_params} - admin_context = bilean_context.get_admin_context() - db_api.job_create(admin_context, job) + self._add_job(job_id, self.DAILY, **job_params) return True def _generate_timer(self):