From f0d0a1403afbda04284ecdf7e3f25e4a536195dc Mon Sep 17 00:00:00 2001 From: lvdongbing Date: Mon, 11 Apr 2016 05:33:00 -0400 Subject: [PATCH] Separate consumption and recharge from event Change-Id: I77d8ae8952f8a436105277f13ca469ea7130aa97 --- bilean/common/consts.py | 36 ++- bilean/common/context.py | 2 +- bilean/common/exception.py | 8 + bilean/db/api.py | 57 ++++- bilean/db/sqlalchemy/api.py | 120 ++++++++-- .../migrate_repo/versions/001_bilean_init.py | 48 +++- bilean/db/sqlalchemy/models.py | 53 ++++- bilean/engine/actions/base.py | 21 +- bilean/engine/actions/user_action.py | 38 ++- bilean/engine/consumption.py | 118 ++++++++++ bilean/engine/event.py | 218 +++++++++++------- bilean/engine/flows/flow.py | 127 +++++++--- bilean/engine/policy.py | 30 ++- bilean/engine/service.py | 16 +- bilean/engine/user.py | 120 +++++----- bilean/resources/base.py | 114 ++++++++- bilean/scheduler/cron_scheduler.py | 1 - 17 files changed, 861 insertions(+), 266 deletions(-) create mode 100644 bilean/engine/consumption.py diff --git a/bilean/common/consts.py b/bilean/common/consts.py index 60b69dc..5df9071 100644 --- a/bilean/common/consts.py +++ b/bilean/common/consts.py @@ -75,10 +75,10 @@ USER_KEYS = ( RESOURCE_KEYS = ( RES_ID, RES_USER_ID, RES_RULE_ID, RES_RESOURCE_TYPE, RES_PROPERTIES, - RES_RATE, RES_CREATED_AT, RES_UPDATED_AT, RES_DELETED_AT, + RES_RATE, RES_LAST_BILL, RES_CREATED_AT, RES_UPDATED_AT, RES_DELETED_AT, ) = ( 'id', 'user_id', 'rule_id', 'resource_type', 'properties', - 'rate', 'created_at', 'updated_at', 'deleted_at', + 'rate', 'last_bill', 'created_at', 'updated_at', 'deleted_at', ) RULE_KEYS = ( @@ -90,11 +90,12 @@ RULE_KEYS = ( ) EVENT_KEYS = ( - EVENT_ID, EVENT_USER_ID, EVENT_ACTION, EVENT_TIMESTAMP, - EVENT_RESOURCE_TYPE, EVENT_VALUE, EVENT_DELETED_AT, + EVENT_ID, EVENT_TIMESTAMP, EVENT_OBJ_ID, EVENT_OBJ_TYPE, EVENT_ACTION, + EVENT_USER_ID, EVENT_LEVEL, EVENT_STATUS, EVENT_STATUS_REASON, + EVENT_METADATA, ) = ( - 'id', 'user_id', 'action', 'timestamp', - 'resource_type', 'value', 'deleted_at', + 'id', 'timestamp', 'obj_id', 'obj_type', 'action', + 'user_id', 'level', 'status', 'status_reason', 'metadata', ) POLICY_KEYS = ( @@ -104,3 +105,26 @@ POLICY_KEYS = ( 'id', 'name', 'is_default', 'rules', 'metadata', 'created_at', 'updated_at', 'deleted_at', ) + +CONSUMPTION_KEYS = ( + CONSUMPTION_ID, CONSUMPTION_USER_ID, CONSUMPTION_RESOURCE_ID, + CONSUMPTION_RESOURCE_TYPE, CONSUMPTION_START_TIME, CONSUMPTION_END_TIME, + CONSUMPTION_RATE, CONSUMPTION_COST, CONSUMPTION_METADATA, +) = ( + 'id', 'user_id', 'resource_id', + 'resource_type', 'start_time', 'end_time', + 'rate', 'cost', 'metadata', +) + +RECHARGE_KEYS = ( + RECHARGE_ID, RECHARGE_USER_ID, RECHARGE_TYPE, RECHARGE_TIMESTAMP, + RECHARGE_METADATA, +) = ( + 'id', 'user_id', 'type', 'timestamp', 'metadata', +) + +RECHARGE_TYPES = ( + SELF_RECHARGE, SYSTEM_BONUS, +) = ( + 'Recharge', 'System bonus', +) diff --git a/bilean/common/context.py b/bilean/common/context.py index def3d50..3e4c208 100644 --- a/bilean/common/context.py +++ b/bilean/common/context.py @@ -123,7 +123,7 @@ def get_service_context(set_project_id=False, **kwargs): if set_project_id: project = identity_service().conn.session.get_project_id() service_creds.update(project=project) - return RequestContext(**service_creds) + return RequestContext(is_admin=True, **service_creds) def get_admin_context(show_deleted=False): diff --git a/bilean/common/exception.py b/bilean/common/exception.py index 5fc76a9..7bd3b4c 100644 --- a/bilean/common/exception.py +++ b/bilean/common/exception.py @@ -120,6 +120,10 @@ class PolicyNotFound(BileanException): msg_fmt = _("The policy (%(policy)s) could not be found.") +class MultipleDefaultPolicy(BileanException): + msg_fmt = _("More than one default policies found.") + + class UserNotFound(BileanException): msg_fmt = _("The user (%(user)s) could not be found.") @@ -159,6 +163,10 @@ class EventNotFound(BileanException): msg_fmt = _("The event (%(event)s) could not be found.") +class ConsumptionNotFound(BileanException): + msg_fmt = _("The consumption (%(consumption)s) could not be found.") + + class InvalidResource(BileanException): msg_fmt = _("%(msg)") diff --git a/bilean/db/api.py b/bilean/db/api.py index f570279..8799e72 100644 --- a/bilean/db/api.py +++ b/bilean/db/api.py @@ -129,16 +129,14 @@ def event_get(context, event_id, project_safe=True): return IMPL.event_get(context, event_id, project_safe=project_safe) -def event_get_all(context, user_id=None, show_deleted=False, - filters=None, limit=None, marker=None, - sort_keys=None, sort_dir=None, project_safe=True, - start_time=None, end_time=None): - return IMPL.event_get_all(context, user_id=user_id, - show_deleted=show_deleted, - filters=filters, limit=limit, - marker=marker, sort_keys=sort_keys, - sort_dir=sort_dir, project_safe=project_safe, - start_time=start_time, end_time=end_time) +def event_get_all(context, limit=None, marker=None, sort_keys=None, + sort_dir=None, filters=None, project_safe=True): + return IMPL.event_get_all(context, limit=limit, + marker=marker, + sort_keys=sort_keys, + sort_dir=sort_dir, + filters=filters, + project_safe=project_safe) def event_create(context, values): @@ -306,3 +304,42 @@ def service_get_by_host_and_binary(context, host, binary): def service_get_all(context): return IMPL.service_get_all(context) + + +# consumptions +def consumption_get(context, consumption_id, project_safe=True): + return IMPL.consumption_get(context, consumption_id, + project_safe=project_safe) + + +def consumption_get_all(context, limit=None, marker=None, sort_keys=None, + sort_dir=None, filters=None, project_safe=True): + return IMPL.consumption_get_all(context, limit=limit, + marker=marker, + sort_keys=sort_keys, + sort_dir=sort_dir, + filters=filters, + project_safe=project_safe) + + +def consumption_create(context, values): + return IMPL.consumption_create(context, values) + + +# recharges +def recharge_create(context, values): + return IMPL.recharge_create(context, values) + + +def recharge_get(context, recharge_id, project_safe=True): + return IMPL.recharge_get(context, recharge_id, project_safe=project_safe) + + +def recharge_get_all(context, limit=None, marker=None, sort_keys=None, + sort_dir=None, filters=None, project_safe=True): + return IMPL.recharge_get_all(context, limit=limit, + marker=marker, + sort_keys=sort_keys, + sort_dir=sort_dir, + filters=filters, + project_safe=project_safe) diff --git a/bilean/db/sqlalchemy/api.py b/bilean/db/sqlalchemy/api.py index 3ba1b68..2f153c3 100644 --- a/bilean/db/sqlalchemy/api.py +++ b/bilean/db/sqlalchemy/api.py @@ -351,32 +351,22 @@ def event_get(context, event_id, project_safe=True): return event -def event_get_all(context, user_id=None, limit=None, marker=None, - sort_keys=None, sort_dir=None, filters=None, - start_time=None, end_time=None, project_safe=True, - show_deleted=False): - query = soft_delete_aware_query(context, models.Event, - show_deleted=show_deleted) +def event_get_all(context, limit=None, marker=None, sort_keys=None, + sort_dir=None, filters=None, project_safe=True): + query = model_query(context, models.Event) + if context.is_admin: + project_safe = False if project_safe: query = query.filter_by(user_id=context.project) - - elif user_id: - query = query.filter_by(user_id=user_id) - - if start_time: - query = query.filter_by(models.Event.timestamp >= start_time) - if end_time: - query = query.filter_by(models.Event.timestamp <= end_time) - if filters is None: filters = {} sort_key_map = { - consts.EVENT_ACTION: models.Event.action.key, - consts.EVENT_RESOURCE_TYPE: models.Event.resource_type.key, + consts.EVENT_LEVEL: models.Event.level.key, consts.EVENT_TIMESTAMP: models.Event.timestamp.key, consts.EVENT_USER_ID: models.Event.user_id.key, + consts.EVENT_STATUS: models.Event.status.key, } keys = _get_sort_keys(sort_keys, sort_key_map) query = db_filters.exact_filter(query, models.Event, filters) @@ -875,3 +865,99 @@ def service_get_by_host_and_binary(context, host, binary): def service_get_all(context): return model_query(context, models.Service).all() + + +# consumptions +def consumption_get(context, consumption_id, project_safe=True): + query = model_query(context, models.Consumption) + consumption = query.get(consumption_id) + + if consumption is None: + return None + + if project_safe and context.project != consumption.user_id: + return None + + return consumption + + +def consumption_get_all(context, limit=None, marker=None, sort_keys=None, + sort_dir=None, filters=None, project_safe=True): + query = model_query(context, models.Consumption) + + if context.is_admin: + project_safe = False + if project_safe: + query = query.filter_by(user_id=context.project) + if filters is None: + filters = {} + + sort_key_map = { + consts.CONSUMPTION_USER_ID: models.Consumption.user_id.key, + consts.CONSUMPTION_RESOURCE_TYPE: models.Consumption.resource_type.key, + consts.CONSUMPTION_START_TIME: models.Consumption.start_time.key, + } + keys = _get_sort_keys(sort_keys, sort_key_map) + query = db_filters.exact_filter(query, models.Consumption, filters) + return _paginate_query(context, query, models.Consumption, + limit=limit, marker=marker, + sort_keys=keys, sort_dir=sort_dir, + default_sort_keys=['id']).all() + + +def consumption_create(context, values): + consumption_ref = models.Consumption() + consumption_ref.update(values) + consumption_ref.save(_session(context)) + return consumption_ref + + +def consumption_delete(context, consumption_id): + session = _session(context) + session.query(models.Consumption).filter_by( + id=consumption_id).delete(synchronize_session='fetch') + + +# recharges +def recharge_create(context, values): + recharge_ref = models.Recharge() + recharge_ref.update(values) + recharge_ref.save(_session(context)) + return recharge_ref + + +def recharge_get(context, recharge_id, project_safe=True): + query = model_query(context, models.Recharge) + recharge = query.get(recharge_id) + + if recharge is None: + return None + + if project_safe and context.project != recharge.user_id: + return None + + return recharge + + +def recharge_get_all(context, limit=None, marker=None, sort_keys=None, + sort_dir=None, filters=None, project_safe=True): + query = model_query(context, models.Recharge) + + if context.is_admin: + project_safe = False + if project_safe: + query = query.filter_by(user_id=context.project) + if filters is None: + filters = {} + + sort_key_map = { + consts.RECHARGE_USER_ID: models.Recharge.user_id.key, + consts.RECHARGE_TYPE: models.Recharge.type.key, + consts.RECHARGE_TIMESTAMP: models.Recharge.timestamp.key, + } + keys = _get_sort_keys(sort_keys, sort_key_map) + query = db_filters.exact_filter(query, models.Recharge, filters) + return _paginate_query(context, query, models.Recharge, + limit=limit, marker=marker, + sort_keys=keys, sort_dir=sort_dir, + default_sort_keys=['id']).all() diff --git a/bilean/db/sqlalchemy/migrate_repo/versions/001_bilean_init.py b/bilean/db/sqlalchemy/migrate_repo/versions/001_bilean_init.py index c3825ce..9074bc2 100644 --- a/bilean/db/sqlalchemy/migrate_repo/versions/001_bilean_init.py +++ b/bilean/db/sqlalchemy/migrate_repo/versions/001_bilean_init.py @@ -80,12 +80,10 @@ def upgrade(migrate_engine): sqlalchemy.String(36), sqlalchemy.ForeignKey('user.id'), nullable=False), - sqlalchemy.Column('rule_id', - sqlalchemy.String(36), - sqlalchemy.ForeignKey('rule.id'), - nullable=False), + sqlalchemy.Column('rule_id', sqlalchemy.String(36), nullable=False), sqlalchemy.Column('resource_type', sqlalchemy.String(36), nullable=False), + sqlalchemy.Column('last_bill', sqlalchemy.DateTime), sqlalchemy.Column('properties', types.Dict), sqlalchemy.Column('rate', sqlalchemy.Float, nullable=False), sqlalchemy.Column('created_at', sqlalchemy.DateTime), @@ -99,13 +97,45 @@ def upgrade(migrate_engine): 'event', meta, sqlalchemy.Column('id', sqlalchemy.String(36), primary_key=True, nullable=False), - sqlalchemy.Column('user_id', sqlalchemy.String(36), - sqlalchemy.ForeignKey('user.id'), nullable=False), sqlalchemy.Column('timestamp', sqlalchemy.DateTime), - sqlalchemy.Column('resource_type', sqlalchemy.String(36)), + sqlalchemy.Column('obj_id', sqlalchemy.String(36)), + sqlalchemy.Column('obj_type', sqlalchemy.String(36)), + sqlalchemy.Column('obj_name', sqlalchemy.String(255)), sqlalchemy.Column('action', sqlalchemy.String(36)), + sqlalchemy.Column('user_id', sqlalchemy.String(36)), + sqlalchemy.Column('level', sqlalchemy.Integer), + sqlalchemy.Column('status', sqlalchemy.String(255)), + sqlalchemy.Column('status_reason', sqlalchemy.Text), + sqlalchemy.Column('meta_data', types.Dict), + mysql_engine='InnoDB', + mysql_charset='utf8' + ) + + consumption = sqlalchemy.Table( + 'consumption', meta, + sqlalchemy.Column('id', sqlalchemy.String(36), + primary_key=True, nullable=False), + sqlalchemy.Column('user_id', sqlalchemy.String(36)), + sqlalchemy.Column('resource_id', sqlalchemy.String(36)), + sqlalchemy.Column('resource_type', sqlalchemy.String(255)), + sqlalchemy.Column('start_time', sqlalchemy.DateTime), + sqlalchemy.Column('end_time', sqlalchemy.DateTime), + sqlalchemy.Column('rate', sqlalchemy.Float), + sqlalchemy.Column('cost', sqlalchemy.Float), + sqlalchemy.Column('meta_data', types.Dict), + mysql_engine='InnoDB', + mysql_charset='utf8' + ) + + recharge = sqlalchemy.Table( + 'recharge', meta, + sqlalchemy.Column('id', sqlalchemy.String(36), + primary_key=True, nullable=False), + sqlalchemy.Column('user_id', sqlalchemy.String(36)), + sqlalchemy.Column('type', sqlalchemy.String(255)), + sqlalchemy.Column('timestamp', sqlalchemy.DateTime), sqlalchemy.Column('value', sqlalchemy.Float), - sqlalchemy.Column('deleted_at', sqlalchemy.DateTime), + sqlalchemy.Column('meta_data', types.Dict), mysql_engine='InnoDB', mysql_charset='utf8' ) @@ -180,6 +210,8 @@ def upgrade(migrate_engine): rule, resource, event, + consumption, + recharge, action, dependency, user_lock, diff --git a/bilean/db/sqlalchemy/models.py b/bilean/db/sqlalchemy/models.py index ca7bd2d..3fe41ce 100644 --- a/bilean/db/sqlalchemy/models.py +++ b/bilean/db/sqlalchemy/models.py @@ -143,14 +143,12 @@ class Resource(BASE, BileanBase, SoftDelete, models.TimestampMixin): sqlalchemy.String(36), sqlalchemy.ForeignKey('user.id'), nullable=False) - rule_id = sqlalchemy.Column( - sqlalchemy.String(36), - sqlalchemy.ForeignKey('rule.id'), - nullable=True) + rule_id = sqlalchemy.Column(sqlalchemy.String(36), nullable=True) user = relationship(User, backref=backref('resources')) resource_type = sqlalchemy.Column(sqlalchemy.String(36), nullable=False) - properties = sqlalchemy.Column(types.Dict) rate = sqlalchemy.Column(sqlalchemy.Float, nullable=False) + last_bill = sqlalchemy.Column(sqlalchemy.DateTime) + properties = sqlalchemy.Column(types.Dict) class Action(BASE, BileanBase, StateAware, models.TimestampMixin): @@ -187,21 +185,52 @@ class ActionDependency(BASE, BileanBase): nullable=False) -class Event(BASE, BileanBase, SoftDelete): +class Event(BASE, BileanBase, StateAware): """Represents an event generated by the bilean engine.""" __tablename__ = 'event' id = sqlalchemy.Column(sqlalchemy.String(36), primary_key=True, default=lambda: UUID4()) - user_id = sqlalchemy.Column(sqlalchemy.String(36), - sqlalchemy.ForeignKey('user.id'), - nullable=False) - user = relationship(User, backref=backref('events')) - action = sqlalchemy.Column(sqlalchemy.String(36)) timestamp = sqlalchemy.Column(sqlalchemy.DateTime) - resource_type = sqlalchemy.Column(sqlalchemy.String(36)) + obj_id = sqlalchemy.Column(sqlalchemy.String(36)) + obj_type = sqlalchemy.Column(sqlalchemy.String(36)) + obj_name = sqlalchemy.Column(sqlalchemy.String(255)) + action = sqlalchemy.Column(sqlalchemy.String(36)) + user_id = sqlalchemy.Column(sqlalchemy.String(36)) + level = sqlalchemy.Column(sqlalchemy.Integer) + meta_data = sqlalchemy.Column(types.Dict) + + +class Consumption(BASE, BileanBase): + """Consumption objects.""" + + __tablename__ = 'consumption' + + id = sqlalchemy.Column(sqlalchemy.String(36), primary_key=True, + default=lambda: UUID4()) + user_id = sqlalchemy.Column(sqlalchemy.String(36)) + resource_id = sqlalchemy.Column(sqlalchemy.String(36)) + resource_type = sqlalchemy.Column(sqlalchemy.String(255)) + start_time = sqlalchemy.Column(sqlalchemy.DateTime) + end_time = sqlalchemy.Column(sqlalchemy.DateTime) + rate = sqlalchemy.Column(sqlalchemy.Float) + cost = sqlalchemy.Column(sqlalchemy.Float) + meta_data = sqlalchemy.Column(types.Dict) + + +class Recharge(BASE, BileanBase): + """Recharge history.""" + + __tablename__ = 'recharge' + + id = sqlalchemy.Column(sqlalchemy.String(36), primary_key=True, + default=lambda: UUID4()) + user_id = sqlalchemy.Column(sqlalchemy.String(36)) + type = sqlalchemy.Column(sqlalchemy.String(255)) + timestamp = sqlalchemy.Column(sqlalchemy.DateTime) value = sqlalchemy.Column(sqlalchemy.Float) + meta_data = sqlalchemy.Column(types.Dict) class Job(BASE, BileanBase): diff --git a/bilean/engine/actions/base.py b/bilean/engine/actions/base.py index 61e0857..11de5cb 100644 --- a/bilean/engine/actions/base.py +++ b/bilean/engine/actions/base.py @@ -22,6 +22,7 @@ from bilean.common import exception from bilean.common.i18n import _ from bilean.common.i18n import _LE from bilean.db import api as db_api +from bilean.engine import event as EVENT wallclock = time.time LOG = logging.getLogger(__name__) @@ -262,12 +263,12 @@ class Action(object): expected_statuses = (self.SUSPENDED) if self.status not in expected_statuses: - msg = _LE("Action (%(action)s) is in unexpected status " - "(%(actual)s) while expected status should be one of " - "(%(expected)s).") % dict(action=self.id, - expected=expected_statuses, - actual=self.status) - LOG.error(msg) + reason = _("Action (%(action)s) is in unexpected status " + "(%(actual)s) while expected status should be one of " + "(%(expected)s).") % dict(action=self.id, + expected=expected_statuses, + actual=self.status) + EVENT.error(self.context, self, cmd, status_reason=reason) return db_api.action_signal(self.context, self.id, cmd) @@ -311,6 +312,13 @@ class Action(object): # We abandon it and then notify other dispatchers to execute it db_api.action_abandon(self.context, self.id) + if status == self.SUCCEEDED: + EVENT.info(self.context, self, self.action, status, reason) + elif status == self.READY: + EVENT.warning(self.context, self, self.action, status, reason) + else: + EVENT.error(self.context, self, self.action, status, reason) + self.status = status self.status_reason = reason @@ -327,6 +335,7 @@ class Action(object): def _check_signal(self): # Check timeout first, if true, return timeout message if self.timeout is not None and self.is_timeout(): + EVENT.debug(self.context, self, self.action, 'TIMEOUT') return self.RES_TIMEOUT result = db_api.action_signal_query(self.context, self.id) diff --git a/bilean/engine/actions/user_action.py b/bilean/engine/actions/user_action.py index fbce963..dde215c 100644 --- a/bilean/engine/actions/user_action.py +++ b/bilean/engine/actions/user_action.py @@ -17,8 +17,10 @@ from bilean.common.i18n import _ from bilean.common.i18n import _LE from bilean.common.i18n import _LI from bilean.engine.actions import base +from bilean.engine import event as EVENT from bilean.engine.flows import flow as bilean_flow from bilean.engine import lock as bilean_lock +from bilean.engine import user as user_mod from bilean.resources import base as resource_base from oslo_log import log as logging @@ -37,12 +39,28 @@ class UserAction(base.Action): 'USER_SETTLE_ACCOUNT', ) + def __init__(self, target, action, context, **kwargs): + """Constructor for a user action object. + + :param target: ID of the target user object on which the action is to + be executed. + :param action: The name of the action to be executed. + :param context: The context used for accessing the DB layer. + :param dict kwargs: Additional parameters that can be passed to the + action. + """ + super(UserAction, self).__init__(target, action, context, **kwargs) + + try: + self.user = user_mod.User.load(self.context, user_id=self.target) + except Exception: + self.user = None + def do_create_resource(self): resource = resource_base.Resource.from_dict(self.inputs) try: - flow_engine = bilean_flow.get_flow(self.context, - resource, - 'create') + flow_engine = bilean_flow.get_create_resource_flow( + self.context, self.target, resource) with bilean_flow.DynamicLogListener(flow_engine, logger=LOG): flow_engine.run() except Exception as ex: @@ -55,7 +73,8 @@ class UserAction(base.Action): def do_update_resource(self): try: - resource_id = self.inputs.get('id') + values = self.inputs + resource_id = values.pop('id', None) resource = resource_base.Resource.load( self.context, resource_id=resource_id) except exception.ResourceNotFound: @@ -64,9 +83,8 @@ class UserAction(base.Action): return self.RES_ERROR, _('Resource not found.') try: - flow_engine = bilean_flow.get_flow(self.context, - resource, - 'update') + flow_engine = bilean_flow.get_update_resource_flow( + self.context, self.target, resource, values) with bilean_flow.DynamicLogListener(flow_engine, logger=LOG): flow_engine.run() except Exception as ex: @@ -89,9 +107,8 @@ class UserAction(base.Action): return self.RES_ERROR, _('Resource not found.') try: - flow_engine = bilean_flow.get_flow(self.context, - resource, - 'delete') + flow_engine = bilean_flow.get_delete_resource_flow( + self.context, self.target, resource) with bilean_flow.DynamicLogListener(flow_engine, logger=LOG): flow_engine.run() except Exception as ex: @@ -126,6 +143,7 @@ class UserAction(base.Action): if method is None: reason = _('Unsupported action: %s') % self.action + EVENT.error(self.context, self.user, self.action, 'Failed', reason) return self.RES_ERROR, reason return method() diff --git a/bilean/engine/consumption.py b/bilean/engine/consumption.py new file mode 100644 index 0000000..185cf5a --- /dev/null +++ b/bilean/engine/consumption.py @@ -0,0 +1,118 @@ +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from bilean.common import exception +from bilean.common import utils +from bilean.db import api as db_api + +from oslo_log import log as logging + +LOG = logging.getLogger(__name__) + + +class Consumption(object): + """Class reference to consumption record.""" + + def __init__(self, user_id, **kwargs): + self.id = kwargs.get('id') + self.user_id = user_id + + self.resource_id = kwargs.get('resource_id') + self.resource_type = kwargs.get('resource_type') + + self.start_time = kwargs.get('start_time') + self.end_time = kwargs.get('end_time') + self.rate = kwargs.get('rate') + self.cost = kwargs.get('cost') + self.metadata = kwargs.get('metadata') + + @classmethod + def from_db_record(cls, record): + '''Construct a consumption object from a database record.''' + + kwargs = { + 'id': record.id, + 'resource_id': record.resource_id, + 'resource_type': record.resource_type, + 'start_time': record.start_time, + 'end_time': record.end_time, + 'rate': record.rate, + 'cost': record.cost, + 'metadata': record.meta_data, + } + return cls(record.user_id, **kwargs) + + @classmethod + def load(cls, context, db_consumption=None, consumption_id=None, + project_safe=True): + '''Retrieve a consumption record from database.''' + if db_consumption is not None: + return cls.from_db_record(db_consumption) + + record = db_api.consumption_get(context, consumption_id, + project_safe=project_safe) + if record is None: + raise exception.ConsumptionNotFound(consumption=consumption_id) + + return cls.from_db_record(record) + + @classmethod + def load_all(cls, context, limit=None, marker=None, sort_keys=None, + sort_dir=None, filters=None, project_safe=True): + '''Retrieve all consumptions from database.''' + + records = db_api.consumption_get_all(context, limit=limit, + marker=marker, + filters=filters, + sort_keys=sort_keys, + sort_dir=sort_dir, + project_safe=project_safe) + + for record in records: + yield cls.from_db_record(record) + + def store(self, context): + '''Store the consumption into database and return its ID.''' + values = { + 'user_id': self.user_id, + 'resource_id': self.resource_id, + 'resource_type': self.resource_type, + 'start_time': self.start_time, + 'end_time': self.end_time, + 'rate': self.rate, + 'cost': self.cost, + 'meta_data': self.metadata, + } + + consumption = db_api.consumption_create(context, values) + self.id = consumption.id + + return self.id + + def delete(self, context): + '''Delete consumption from database.''' + db_api.consumption_delete(context, self.id) + + def to_dict(self): + consumption = { + 'id': self.id, + 'user_id': self.user_id, + 'resource_id': self.resource_id, + 'resource_type': self.resource_type, + 'start_time': utils.format_time(self.start_time), + 'end_time': utils.format_time(self.end_time), + 'rate': self.rate, + 'cost': self.cost, + 'metadata': self.metadata, + } + return consumption diff --git a/bilean/engine/event.py b/bilean/engine/event.py index 3db4821..c72479c 100644 --- a/bilean/engine/event.py +++ b/bilean/engine/event.py @@ -11,29 +11,52 @@ # License for the specific language governing permissions and limitations # under the License. -import six +import logging from bilean.common import exception from bilean.common.i18n import _ +from bilean.common.i18n import _LC +from bilean.common.i18n import _LE +from bilean.common.i18n import _LI +from bilean.common.i18n import _LW from bilean.common import utils from bilean.db import api as db_api -from bilean.resources import base as resource_base -from oslo_log import log as logging +from oslo_log import log +from oslo_utils import reflection from oslo_utils import timeutils -LOG = logging.getLogger(__name__) +LOG = log.getLogger(__name__) class Event(object): - """Class to deal with consumption record.""" + '''capturing an interesting happening in Bilean.''' - def __init__(self, timestamp, **kwargs): + def __init__(self, timestamp, level, entity=None, **kwargs): self.timestamp = timestamp + self.level = level + + self.id = kwargs.get('id') self.user_id = kwargs.get('user_id') + self.action = kwargs.get('action') - self.resource_type = kwargs.get('resource_type') - self.value = kwargs.get('value', 0) + self.status = kwargs.get('status') + self.status_reason = kwargs.get('status_reason') + + self.obj_id = kwargs.get('obj_id') + self.obj_type = kwargs.get('obj_type') + self.obj_name = kwargs.get('obj_name') + self.metadata = kwargs.get('metadata') + + cntx = kwargs.get('context') + if cntx is not None: + self.user_id = cntx.project + + if entity is not None: + self.obj_id = entity.id + self.obj_name = entity.name + e_type = reflection.get_class_name(entity, fully_qualified=False) + self.obj_type = e_type.upper() @classmethod def from_db_record(cls, record): @@ -43,11 +66,14 @@ class Event(object): 'id': record.id, 'user_id': record.user_id, 'action': record.action, - 'resource_type': record.resource_type, - 'action': record.action, - 'value': record.value, + 'status': record.status, + 'status_reason': record.status_reason, + 'obj_id': record.obj_id, + 'obj_type': record.obj_type, + 'obj_name': record.obj_name, + 'metadata': record.meta_data, } - return cls(record.timestamp, **kwargs) + return cls(record.timestamp, record.level, **kwargs) @classmethod def load(cls, context, db_event=None, event_id=None, project_safe=True): @@ -62,19 +88,16 @@ class Event(object): return cls.from_db_record(record) @classmethod - def load_all(cls, context, user_id=None, limit=None, marker=None, - sort_keys=None, sort_dir=None, filters=None, - start_time=None, end_time=None, project_safe=True, - show_deleted=False,): + def load_all(cls, context, limit=None, marker=None, sort_keys=None, + sort_dir=None, filters=None, project_safe=True): '''Retrieve all events from database.''' - records = db_api.event_get_all(context, user_id=user_id, limit=limit, - marker=marker, filters=filters, - sort_keys=sort_keys, sort_dir=sort_dir, - start_time=start_time, - end_time=end_time, - project_safe=project_safe, - show_deleted=show_deleted) + records = db_api.event_get_all(context, limit=limit, + marker=marker, + filters=filters, + sort_keys=sort_keys, + sort_dir=sort_dir, + project_safe=project_safe) for record in records: yield cls.from_db_record(record) @@ -83,11 +106,15 @@ class Event(object): '''Store the event into database and return its ID.''' values = { 'timestamp': self.timestamp, + 'level': self.level, 'user_id': self.user_id, 'action': self.action, - 'resource_type': self.resource_type, - 'action': self.action, - 'value': self.value, + 'status': self.status, + 'status_reason': self.status_reason, + 'obj_id': self.obj_id, + 'obj_type': self.obj_type, + 'obj_name': self.obj_name, + 'meta_data': self.metadata, } event = db_api.event_create(context, values) @@ -95,79 +122,92 @@ class Event(object): return self.id - @classmethod - def from_dict(cls, **kwargs): - timestamp = kwargs.pop('timestamp') - return cls(timestamp, kwargs) - def to_dict(self): evt = { 'id': self.id, + 'level': self.level, 'user_id': self.user_id, 'action': self.action, - 'resource_type': self.resource_type, - 'action': self.action, - 'value': self.value, + 'status': self.status, + 'status_reason': self.status_reason, + 'obj_id': self.obj_id, + 'obj_type': self.obj_type, + 'obj_name': self.obj_name, 'timestamp': utils.format_time(self.timestamp), + 'metadata': self.metadata, } return evt -def record(context, user, timestamp=None, action='charge', cause_resource=None, - resource_action=None, extra_cost=0, value=0): - """Generate events for specify user +def critical(context, entity, action, status=None, status_reason=None, + timestamp=None): + timestamp = timestamp or timeutils.utcnow() + event = Event(timestamp, logging.CRITICAL, entity, + action=action, status=status, status_reason=status_reason, + user_id=context.project) + event.store(context) + LOG.critical(_LC('%(name)s [%(id)s] - %(status)s: %(reason)s'), + {'name': event.obj_name, + 'id': event.obj_id and event.obj_id[:8], + 'status': status, + 'reason': status_reason}) - :param context: oslo.messaging.context - :param user: object user to mark event - :param action: action of event, include 'charge' and 'recharge' - :param cause_resource: object resource which triggered the action - :param resource_action: action of resource - :param extra_cost: extra cost of the resource - :param timestamp: timestamp when event occurs - :param value: value of recharge, needed when action is 'recharge' - """ - if timestamp is None: - timestamp = timeutils.utcnow() - try: - if action == 'charge': - resources = resource_base.Resource.load_all( - context, user_id=user.id, project_safe=False) - seconds = (timestamp - user.last_bill).total_seconds() - res_mapping = {} - for resource in resources: - if cause_resource and resource.id == cause_resource.id: - if resource_action == 'create': - usage = extra_cost - elif resource_action == 'update': - usage = resource.rate * seconds + extra_cost - else: - usage = resource.rate * seconds - if res_mapping.get(resource.resource_type) is None: - res_mapping[resource.resource_type] = usage - else: - res_mapping[resource.resource_type] += usage - if resource_action == 'delete': - usage = cause_resource.rate * seconds + extra_cost - if res_mapping.get(cause_resource.resource_type) is None: - res_mapping[cause_resource.resource_type] = 0 - res_mapping[cause_resource.resource_type] += usage +def error(context, entity, action, status=None, status_reason=None, + timestamp=None): + timestamp = timestamp or timeutils.utcnow() + event = Event(timestamp, logging.ERROR, entity, + action=action, status=status, status_reason=status_reason, + user_id=context.project) + event.store(context) + LOG.error(_LE('%(name)s [%(id)s] %(action)s - %(status)s: %(reason)s'), + {'name': event.obj_name, + 'id': event.obj_id and event.obj_id[:8], + 'action': action, + 'status': status, + 'reason': status_reason}) - for res_type in res_mapping.keys(): - event = Event(timestamp, - user_id=user.id, - action=action, - resource_type=res_type, - value=res_mapping.get(res_type)) - event.store(context) - elif action == 'recharge': - event = Event(timestamp, - user_id=user.id, - action=action, - value=value) - event.store(context) - else: - msg = _("Unsupported event action '%s'.") % action - raise exception.BileanException(msg=msg) - except Exception as exc: - LOG.error(_("Error generate events: %s") % six.text_type(exc)) + +def warning(context, entity, action, status=None, status_reason=None, + timestamp=None): + timestamp = timestamp or timeutils.utcnow() + event = Event(timestamp, logging.WARNING, entity, + action=action, status=status, status_reason=status_reason, + user_id=context.project) + event.store(context) + LOG.warning(_LW('%(name)s [%(id)s] %(action)s - %(status)s: %(reason)s'), + {'name': event.obj_name, + 'id': event.obj_id and event.obj_id[:8], + 'action': action, + 'status': status, + 'reason': status_reason}) + + +def info(context, entity, action, status=None, status_reason=None, + timestamp=None): + timestamp = timestamp or timeutils.utcnow() + event = Event(timestamp, logging.INFO, entity, + action=action, status=status, status_reason=status_reason, + user_id=context.project) + event.store(context) + LOG.info(_LI('%(name)s [%(id)s] %(action)s - %(status)s: %(reason)s'), + {'name': event.obj_name, + 'id': event.obj_id and event.obj_id[:8], + 'action': action, + 'status': status, + 'reason': status_reason}) + + +def debug(context, entity, action, status=None, status_reason=None, + timestamp=None): + timestamp = timestamp or timeutils.utcnow() + event = Event(timestamp, logging.DEBUG, entity, + action=action, status=status, status_reason=status_reason, + user_id=context.project) + event.store(context) + LOG.debug(_('%(name)s [%(id)s] %(action)s - %(status)s: %(reason)s'), + {'name': event.obj_name, + 'id': event.obj_id and event.obj_id[:8], + 'action': action, + 'status': status, + 'reason': status_reason}) diff --git a/bilean/engine/flows/flow.py b/bilean/engine/flows/flow.py index fc20018..8b5cbfb 100644 --- a/bilean/engine/flows/flow.py +++ b/bilean/engine/flows/flow.py @@ -69,12 +69,16 @@ class CreateResourceTask(task.Task): def execute(self, context, resource, **kwargs): user = user_mod.User.load(context, user_id=resource.user_id) - user_policy = policy_mod.Policy.load(context, policy_id=user.policy_id) - rule = user_policy.find_rule(context, resource.resource_type) + try: + policy = policy_mod.Policy.load(context, policy_id=user.policy_id) + except exception.PolicyNotFound: + policy = policy_mod.Policy.load_default(context) + if policy is not None: + rule = policy.find_rule(context, resource.resource_type) - # Update resource with rule_id and rate - resource.rule_id = rule.id - resource.rate = rule.get_price(resource) + # Update resource with rule_id and rate + resource.rule_id = rule.id + resource.rate = rule.get_price(resource) resource.store(context) def revert(self, context, resource, result, **kwargs): @@ -94,7 +98,7 @@ class UpdateResourceTask(task.Task): resource.properties = values.get('properties') rule = rule_base.Rule.load(context, rule_id=resource.rule_id) resource.rate = rule.get_price(resource) - resource.d_rate = resource.rate - old_rate + resource.delta_rate = resource.rate - old_rate resource.store(context) def revert(self, context, resource, resource_bak, result, **kwargs): @@ -122,6 +126,25 @@ class DeleteResourceTask(task.Task): resource.store(context) +class CreateConsumptionTask(task.Task): + """Generate consumption record and store to db.""" + + def execute(self, context, resource, *args, **kwargs): + consumption = resource.consumption + if consumption is not None: + consumption.store(context) + + def revert(self, context, resource, result, *args, **kwargs): + if isinstance(result, ft.Failure): + LOG.error(_LE("Error when storing consumption of resource: %s"), + resource.id) + return + + consumption = resource.consumption + if consumption is not None: + consumption.delete(context) + + class LoadUserTask(task.Task): """Load user from db.""" @@ -150,17 +173,18 @@ class SettleAccountTask(task.Task): user.store(context) -class UpdateUserWithResourceTask(task.Task): - """Update user with resource actions.""" +class UpdateUserRateTask(task.Task): + """Update user's rate .""" - def execute(self, context, user_obj, user_bak, resource, - resource_action, **kwargs): - user_obj.update_with_resource(context, resource, - resource_action=resource_action) + def execute(self, context, user_obj, user_bak, resource, *args, **kwargs): + user_obj.update_rate(context, resource.delta_rate, + timestamp=resource.last_bill, + delayed_cost=resource.delayed_cost) - def revert(self, context, user_bak, result, **kwargs): + def revert(self, context, user_obj, user_bak, resource, result, + *args, **kwargs): if isinstance(result, ft.Failure): - LOG.error(_LE("Error when updating user: %s"), user_bak.get('id')) + LOG.error(_LE("Error when updating user: %s"), user_obj.id) return # Restore user @@ -196,25 +220,72 @@ def get_settle_account_flow(context, user_id, task=None): return taskflow.engines.load(flow, store=kwargs) -def get_flow(context, resource, resource_action): - """Constructs and returns resource task flow.""" +def get_create_resource_flow(context, user_id, resource): + """Constructs and returns user task flow. - flow_name = resource.user_id + '_' + resource_action + '_resource' + :param context: The request context. + :param user_id: The ID of user. + :param resource: Object resource to create. + """ + + flow_name = user_id + '_create_resource' flow = linear_flow.Flow(flow_name) kwargs = { 'context': context, - 'user_id': resource.user_id, + 'user_id': user_id, 'resource': resource, - 'resource_action': resource_action, } - if resource_action == 'create': - flow.add(CreateResourceTask()) - if resource_action == 'update': - flow.add(UpdateResourceTask()) - kwargs['resource_bak'] = resource.to_dict() - elif resource_action == 'delete': - flow.add(DeleteResourceTask()) - flow.add(LoadUserTask(), - UpdateUserWithResourceTask(), + flow.add(CreateResourceTask(), + LoadUserTask(), + UpdateUserRateTask(), + UpdateUserJobsTask()) + return taskflow.engines.load(flow, store=kwargs) + + +def get_delete_resource_flow(context, user_id, resource): + """Constructs and returns user task flow. + + :param context: The request context. + :param user_id: The ID of user. + :param resource: Object resource to delete. + """ + + flow_name = user_id + '_delete_resource' + flow = linear_flow.Flow(flow_name) + kwargs = { + 'context': context, + 'user_id': user_id, + 'resource': resource, + } + flow.add(DeleteResourceTask(), + CreateConsumptionTask(), + LoadUserTask(), + UpdateUserRateTask(), + UpdateUserJobsTask()) + return taskflow.engines.load(flow, store=kwargs) + + +def get_update_resource_flow(context, user_id, resource, values): + """Constructs and returns user task flow. + + :param context: The request context. + :param user_id: The ID of user. + :param resource: Object resource to update. + :param values: The values to update. + """ + + flow_name = user_id + '_update_resource' + flow = linear_flow.Flow(flow_name) + kwargs = { + 'context': context, + 'user_id': user_id, + 'resource': resource, + 'resource_bak': resource.to_dict(), + 'values': values, + } + flow.add(UpdateResourceTask(), + CreateConsumptionTask(), + LoadUserTask(), + UpdateUserRateTask(), UpdateUserJobsTask()) return taskflow.engines.load(flow, store=kwargs) diff --git a/bilean/engine/policy.py b/bilean/engine/policy.py index 56884c1..254e11c 100644 --- a/bilean/engine/policy.py +++ b/bilean/engine/policy.py @@ -83,6 +83,18 @@ class Policy(object): return cls._from_db_record(policy) + @classmethod + def load_default(cls, context, show_deleted=False): + '''Retrieve default policy from database.''' + filters = {'is_default': True} + policies = cls.load_all(context, filters=filters, + show_deleted=show_deleted) + if len(policies) > 1: + raise exception.MultipleDefaultPolicy() + + policy = None if len(policies) < 1 else policies[0] + return policy + @classmethod def load_all(cls, context, limit=None, marker=None, sort_keys=None, sort_dir=None, @@ -98,15 +110,6 @@ class Policy(object): return [cls._from_db_record(record) for record in records] - def find_rule(self, context, rtype): - '''Find the exact rule from self.rules by rtype''' - - for rule in self.rules: - if rtype == rule['type'].split('-')[0]: - return rule_base.Rule.load(context, rule_id=rule['id']) - - raise exception.RuleNotFound(rule_type=rtype) - def to_dict(self): policy_dict = { 'id': self.id, @@ -123,3 +126,12 @@ class Policy(object): def do_delete(self, context): db_api.policy_delete(context, self.id) return True + + def find_rule(self, context, rtype): + '''Find the exact rule from self.rules by rtype''' + + for rule in self.rules: + if rtype == rule['type'].split('-')[0]: + return rule_base.Rule.load(context, rule_id=rule['id']) + + raise exception.RuleNotFound(rule_type=rtype) diff --git a/bilean/engine/service.py b/bilean/engine/service.py index 5fc0b03..484c5b2 100644 --- a/bilean/engine/service.py +++ b/bilean/engine/service.py @@ -21,6 +21,7 @@ from oslo_log import log as logging import oslo_messaging from oslo_service import service from oslo_service import threadgroup +from oslo_utils import timeutils from bilean.common import consts from bilean.common import context as bilean_context @@ -287,10 +288,19 @@ class EngineService(service.Service): return user.to_dict() @request_context - def user_recharge(self, cnxt, user_id, value): + def user_recharge(self, cnxt, user_id, value, recharge_type=None, + timestamp=None, metadata=None): """Do recharge for specify user.""" - user = user_mod.User.load(cnxt, user_id=user_id) - user.do_recharge(cnxt, value) + try: + user = user_mod.User.load(cnxt, user_id=user_id) + except exception.UserNotFound as ex: + raise exception.BileanBadRequest(msg=six.text_type(ex)) + + recharge_type = recharge_type or consts.SELF_RECHARGE + timestamp = timestamp or timeutils.utcnow() + metadata = metadata or {} + user.do_recharge(cnxt, value, recharge_type=recharge_type, + timestamp=timestamp, metadata=metadata) # As user has been updated, the billing job for the user # should to be updated too. bilean_scheduler.notify(bilean_scheduler.UPDATE_JOBS, diff --git a/bilean/engine/user.py b/bilean/engine/user.py index 54ba477..8638f83 100644 --- a/bilean/engine/user.py +++ b/bilean/engine/user.py @@ -19,7 +19,6 @@ from bilean.common.i18n import _LI from bilean.common import utils from bilean.db import api as db_api from bilean.drivers import base as driver_base -from bilean.engine import event as event_mod from bilean import notifier as bilean_notifier from bilean.resources import base as resource_base @@ -39,6 +38,8 @@ class User(object): 'INIT', 'FREE', 'ACTIVE', 'WARNING', 'FREEZE', ) + ALLOW_DELAY_TIME = 10 + def __init__(self, user_id, **kwargs): self.id = user_id self.name = kwargs.get('name') @@ -212,66 +213,56 @@ class User(object): self.status_reason = reason self.store(context) - def update_with_resource(self, context, resource, - resource_action='create'): - '''Update user with resource''' + def update_rate(self, context, delta_rate, timestamp=None, delayed_cost=0): + """Update user's rate and update user status. - now = timeutils.utcnow() - extra_cost = 0 - if 'create' == resource_action: - d_rate = resource.rate - if resource.properties.get('created_at') is not None: - created_at = timeutils.parse_strtime( - resource.properties.get('created_at')) - extra_seconds = (now - created_at).total_seconds() - extra_cost = d_rate * extra_seconds - elif 'delete' == resource_action: - d_rate = -resource.rate - if resource.properties.get('deleted_at') is not None: - deleted_at = timeutils.parse_strtime( - resource.properties.get('deleted_at')) - extra_seconds = (now - deleted_at).total_seconds() - extra_cost = d_rate * extra_seconds - elif 'update' == resource_action: - d_rate = resource.d_rate - if resource.properties.get('updated_at') is not None: - updated_at = timeutils.parse_strtime( - resource.properties.get('updated_at')) - extra_seconds = (now - updated_at).total_seconds() - extra_cost = d_rate * extra_seconds + :param context: The request context. + :param delta_rate: Delta rate to change. + :param timestamp: The time that resource action occurs. + :param delayed_cost: User's action may be delayed by some reason, + adjust balance by delayed_cost. + """ - self._settle_account(context, extra_cost=extra_cost, - cause_resource=resource, - resource_action=resource_action) - self._change_user_rate(context, d_rate) - self.store(context) + if delta_rate == 0 and delayed_cost == 0: + return + + # Settle account before update rate + self._settle_account(context, timestamp=timestamp, + delayed_cost=delayed_cost) - def _change_user_rate(self, context, d_rate): - # Update the rate of user old_rate = self.rate - new_rate = old_rate + d_rate + new_rate = old_rate + delta_rate if old_rate == 0 and new_rate > 0: + # Set last_bill when status change to 'ACTIVE' from 'FREE' self.last_bill = timeutils.utcnow() + reason = _("Status change to 'ACTIVE' cause resource creation.") self.status = self.ACTIVE - elif d_rate < 0: + self.status_reason = reason + elif delta_rate < 0: if new_rate == 0 and self.balance >= 0: reason = _("Status change to 'FREE' because of resource " - "deleting.") + "deletion.") self.status = self.FREE self.status_reason = reason - elif self.status == self.WARNING and not self.notify_or_not(): + elif self.status == self.WARNING and not self._notify_or_not(): reason = _("Status change from 'WARNING' to 'ACTIVE' " - "because of resource deleting.") + "because of resource deletion.") self.status = self.ACTIVE self.status_reason = reason self.rate = new_rate + self.store(context) - def do_recharge(self, context, value): - '''Do recharge for user.''' - if self.rate > 0 and self.status != self.FREEZE: - self._settle_account(context) + def do_recharge(self, context, value, recharge_type=None, timestamp=None, + metadata=None): + """Recharge for user and update status. + + param context: The request context. + param value: Recharge value. + param recharge_type: Rechage type, 'Recharge'|'System bonus'. + param timestamp: Record when recharge action occurs. + param metadata: Some other keyword. + """ self.balance += value - if self.status == self.INIT and self.balance > 0: self.status = self.FREE self.status_reason = "Recharged" @@ -281,16 +272,22 @@ class User(object): self.status = self.FREE self.status_reason = reason elif self.status == self.WARNING: - if not self.notify_or_not(): + if not self._notify_or_not(): reason = _("Status change from 'WARNING' to 'ACTIVE' because " "of recharge.") self.status = self.ACTIVE self.status_reason = reason - self.store(context) - event_mod.record(context, self, action='recharge', value=value) - def notify_or_not(self): + # Create recharge record + values = {'user_id': self.id, + 'value': value, + 'type': recharge_type, + 'timestamp': timestamp, + 'metadata': metadata} + db_api.recharge_create(context, values) + + def _notify_or_not(self): '''Check if user should be notified.''' cfg.CONF.import_opt('prior_notify_time', 'bilean.scheduler.cron_scheduler', @@ -305,21 +302,22 @@ class User(object): db_api.user_delete(context, self.id) return True - def _settle_account(self, context, cause_resource=None, - resource_action=None, extra_cost=0): - if self.status not in [self.ACTIVE, self.WARNING]: + def _settle_account(self, context, timestamp=None, delayed_cost=0): + if self.rate == 0 and delayed_cost == 0: LOG.info(_LI("Ignore settlement action because user is in '%s' " "status."), self.status) return - now = timeutils.utcnow() - total_seconds = (now - self.last_bill).total_seconds() - cost = self.rate * total_seconds + extra_cost - self.balance -= cost - event_mod.record(context, self, timestamp=now, - cause_resource=cause_resource, - resource_action=resource_action, - extra_cost=extra_cost) - self.last_bill = now + + # Calculate user's cost before last_bill and now + cost = 0 + if self.rate > 0 and self.last_bill: + timestamp = timestamp or timeutils.utcnow() + total_seconds = (timestamp - self.last_bill).total_seconds() + cost = self.rate * total_seconds + total_cost = cost + delayed_cost + + self.balance -= total_cost + self.last_bill = timestamp def settle_account(self, context, task=None): '''Settle account for user.''' @@ -327,7 +325,7 @@ class User(object): notifier = bilean_notifier.Notifier() self._settle_account(context) - if task == 'notify' and self.notify_or_not(): + if task == 'notify' and self._notify_or_not(): self.status_reason = "The balance is almost used up" self.status = self.WARNING # Notify user diff --git a/bilean/resources/base.py b/bilean/resources/base.py index 1e3e6d5..6c015ba 100644 --- a/bilean/resources/base.py +++ b/bilean/resources/base.py @@ -14,8 +14,11 @@ from bilean.common import exception from bilean.common import utils from bilean.db import api as db_api +from bilean.engine import consumption as consumption_mod from bilean.engine import environment +from oslo_utils import timeutils + class Resource(object): """A resource is an object that refers to a physical resource. @@ -25,8 +28,16 @@ class Resource(object): something else. """ + ALLOW_DELAY_TIME = 10 + def __new__(cls, id, user_id, res_type, properties, **kwargs): - """Create a new resource of the appropriate class.""" + """Create a new resource of the appropriate class. + + :param id: The resource ID comes same as the real resource. + :param user_id: The user ID the resource belongs to. + :param properties: The properties of resource. + :param dict kwargs: Other keyword arguments for the resource. + """ if cls != Resource: ResourceClass = cls else: @@ -42,12 +53,17 @@ class Resource(object): self.rule_id = kwargs.get('rule_id') self.rate = kwargs.get('rate', 0) - self.d_rate = 0 + self.last_bill = kwargs.get('last_bill') self.created_at = kwargs.get('created_at') self.updated_at = kwargs.get('updated_at') self.deleted_at = kwargs.get('deleted_at') + # Properties pass to user to help settle account, not store to db + self.delta_rate = 0 + self.delayed_cost = 0 + self.consumption = None + def store(self, context): """Store the resource record into database table.""" @@ -57,22 +73,108 @@ class Resource(object): 'properties': self.properties, 'rule_id': self.rule_id, 'rate': self.rate, + 'last_bill': self.last_bill, 'created_at': self.created_at, 'updated_at': self.updated_at, 'deleted_at': self.deleted_at, } if self.created_at: - db_api.resource_update(context, self.id, values) + self._update(context, values) else: values.update(id=self.id) - resource = db_api.resource_create(context, values) - self.created_at = resource.created_at + self._create(context, values) return self.id def delete(self, context, soft_delete=True): '''Delete resource from db.''' + self._delete(context, soft_delete=soft_delete) + + def _create(self, context, values): + self.delta_rate = self.rate + if self.delta_rate == 0: + resource = db_api.resource_create(context, values) + self.created_at = resource.created_at + return + + now = timeutils.utcnow() + self.last_bill = now + create_time = self.properties.get('created_at') + if create_time is not None: + created_at = timeutils.parse_strtime(create_time) + delayed_seconds = (now - created_at).total_seconds() + # Engine handle resource creation is delayed because of something, + # we suppose less than ALLOW_DELAY_TIME is acceptable. + if delayed_seconds > self.ALLOW_DELAY_TIME: + self.delayed_cost = self.delta_rate * delayed_seconds + self.last_bill = created_at + + values.update(last_bill=self.last_bill) + resource = db_api.resource_create(context, values) + self.created_at = resource.created_at + + def _update(self, context, values): + if self.delta_rate == 0: + db_api.resource_update(context, self.id, values) + return + + update_time = self.properties.get('updated_at') + now = timeutils.utcnow() + updated_at = now + if update_time is not None: + updated_at = timeutils.parse_strtime(update_time) + delayed_seconds = (now - updated_at).total_seconds() + # Engine handle resource update is delayed because of something, + # we suppose less than ALLOW_DELAY_TIME is acceptable. + if delayed_seconds > self.ALLOW_DELAY_TIME: + self.delayed_cost = self.delta_rate * delayed_seconds + + # Generate consumption between lass bill and update time + old_rate = self.rate - self.delta_rate + cost = (updated_at - self.last_bill).total_seconds() * old_rate + params = {'resource_id': self.id, + 'resource_type': self.resource_type, + 'start_time': self.last_bill, + 'end_time': updated_at, + 'rate': old_rate, + 'cost': cost, + 'metadata': {'cause': 'Resource update'}} + self.consumption = consumption_mod.Consumption(self.user_id, **params) + + self.last_bill = updated_at + values.update(last_bill=updated_at) + db_api.resource_update(context, self.id, values) + + def _delete(self, context, soft_delete=True): + self.delta_rate = - self.rate + if self.delta_rate == 0: + db_api.resource_delete(context, self.id, soft_delete=soft_delete) + return + + delete_time = self.properties.get('deleted_at') + now = timeutils.utcnow() + deleted_at = now + if delete_time is not None: + deleted_at = timeutils.parse_strtime(delete_time) + delayed_seconds = (now - deleted_at).total_seconds() + # Engine handle resource deletion is delayed because of something, + # we suppose less than ALLOW_DELAY_TIME is acceptable. + if delayed_seconds > self.ALLOW_DELAY_TIME: + self.delayed_cost = self.delta_rate * delayed_seconds + + # Generate consumption between lass bill and delete time + cost = (deleted_at - self.last_bill).total_seconds() * self.rate + params = {'resource_id': self.id, + 'resource_type': self.resource_type, + 'start_time': self.last_bill, + 'end_time': deleted_at, + 'rate': self.rate, + 'cost': cost, + 'metadata': {'cause': 'Resource deletion'}} + self.consumption = consumption_mod.Consumption(self.user_id, **params) + + self.last_bill = deleted_at db_api.resource_delete(context, self.id, soft_delete=soft_delete) @classmethod @@ -84,6 +186,7 @@ class Resource(object): kwargs = { 'rule_id': record.rule_id, 'rate': record.rate, + 'last_bill': record.last_bill, 'created_at': record.created_at, 'updated_at': record.updated_at, 'deleted_at': record.deleted_at, @@ -139,6 +242,7 @@ class Resource(object): 'properties': self.properties, 'rule_id': self.rule_id, 'rate': self.rate, + 'last_bill': utils.format_time(self.last_bill), 'created_at': utils.format_time(self.created_at), 'updated_at': utils.format_time(self.updated_at), 'deleted_at': utils.format_time(self.deleted_at), diff --git a/bilean/scheduler/cron_scheduler.py b/bilean/scheduler/cron_scheduler.py index 6eccf6b..cd12cb3 100644 --- a/bilean/scheduler/cron_scheduler.py +++ b/bilean/scheduler/cron_scheduler.py @@ -199,7 +199,6 @@ class CronScheduler(object): if user.rate == 0: return False total_seconds = user.balance / user.rate - LOG.info(_LI("###########Fuck user: %s"), user.to_dict()) run_date = timeutils.utcnow() + timedelta(seconds=total_seconds) job_params = {'run_date': run_date} job_id = self._generate_job_id(user.id, self.FREEZE)