From c4546c1d5c59c3dfcf5329c9bcec2f614111b7b9 Mon Sep 17 00:00:00 2001 From: lvdongbing Date: Tue, 22 Mar 2016 02:10:08 -0400 Subject: [PATCH] Add user lock. Change-Id: Ia4d2069b910cdaba3b8666275d0290636d773f82 --- bilean/common/config.py | 6 ++ bilean/db/api.py | 15 ++++ bilean/db/sqlalchemy/api.py | 51 ++++++++++- .../migrate_repo/versions/001_bilean_init.py | 10 +++ bilean/db/sqlalchemy/models.py | 10 +++ bilean/engine/lock.py | 66 ++++++++++++++ bilean/engine/scheduler.py | 45 +++++++--- bilean/engine/service.py | 86 ++++++++++++++----- bilean/engine/user.py | 77 ++++++++++------- bilean/resources/base.py | 4 +- bilean/resources/os/nova/server.py | 7 +- 11 files changed, 313 insertions(+), 64 deletions(-) create mode 100644 bilean/engine/lock.py diff --git a/bilean/common/config.py b/bilean/common/config.py index 2956f33..8b98d5b 100644 --- a/bilean/common/config.py +++ b/bilean/common/config.py @@ -54,6 +54,12 @@ engine_opts = [ cfg.IntOpt('default_action_timeout', default=3600, help=_('Timeout in seconds for actions.')), + cfg.IntOpt('lock_retry_times', + default=50, + help=_('Number of times trying to grab a lock.')), + cfg.IntOpt('lock_retry_interval', + default=1, + help=_('Number of seconds between lock retries.')), ] rpc_opts = [ diff --git a/bilean/db/api.py b/bilean/db/api.py index 9264958..1c19494 100644 --- a/bilean/db/api.py +++ b/bilean/db/api.py @@ -41,6 +41,7 @@ def db_version(engine): return IMPL.db_version(engine) +# users def user_get(context, user_id, show_deleted=False, project_safe=True): return IMPL.user_get(context, user_id, show_deleted=show_deleted, @@ -68,6 +69,7 @@ def user_get_all(context, show_deleted=False, limit=None, filters=filters) +# rules def rule_get(context, rule_id, show_deleted=False): return IMPL.rule_get(context, rule_id, show_deleted=False) @@ -93,6 +95,7 @@ def rule_delete(context, rule_id): return IMPL.rule_delete(context, rule_id) +# resources def resource_get(context, resource_id, show_deleted=False, project_safe=True): return IMPL.resource_get(context, resource_id, show_deleted=show_deleted, @@ -121,6 +124,7 @@ def resource_delete(context, resource_id): IMPL.resource_delete(context, resource_id) +# events def event_get(context, event_id, project_safe=True): return IMPL.event_get(context, event_id, project_safe=project_safe) @@ -145,6 +149,7 @@ def event_delete(context, event_id): return IMPL.event_delete(context, event_id) +# jobs def job_create(context, values): return IMPL.job_create(context, values) @@ -157,6 +162,7 @@ def job_delete(context, job_id): return IMPL.job_delete(context, job_id) +# policies def policy_get(context, policy_id, show_deleted=False): return IMPL.policy_get(context, policy_id, show_deleted=False) @@ -178,3 +184,12 @@ def policy_update(context, policy_id, values): def policy_delete(context, policy_id): return IMPL.policy_delete(context, policy_id) + + +# locks +def user_lock_acquire(user_id, engine_id): + return IMPL.user_lock_acquire(user_id, engine_id) + + +def user_lock_release(user_id, engine_id=None): + return IMPL.user_lock_release(user_id, engine_id=engine_id) diff --git a/bilean/db/sqlalchemy/api.py b/bilean/db/sqlalchemy/api.py index 11f5c78..2c0d3a5 100644 --- a/bilean/db/sqlalchemy/api.py +++ b/bilean/db/sqlalchemy/api.py @@ -24,6 +24,7 @@ from sqlalchemy.orm.session import Session from bilean.common import consts from bilean.common import exception +from bilean.common.i18n import _LE from bilean.db.sqlalchemy import filters as db_filters from bilean.db.sqlalchemy import migration from bilean.db.sqlalchemy import models @@ -115,6 +116,7 @@ def db_version(engine): return migration.db_version(engine) +# users def user_get(context, user_id, show_deleted=False, project_safe=True): query = model_query(context, models.User) user = query.get(user_id) @@ -189,6 +191,7 @@ def user_get_all(context, show_deleted=False, limit=None, default_sort_keys=['id']).all() +# rules def rule_get(context, rule_id, show_deleted=False): query = model_query(context, models.Rule) rule = query.filter_by(id=rule_id).first() @@ -252,6 +255,7 @@ def rule_delete(context, rule_id): session.flush() +# resources def resource_get(context, resource_id, show_deleted=False, project_safe=True): query = model_query(context, models.Resource) resource = query.get(resource_id) @@ -314,7 +318,7 @@ def resource_update(context, resource_id, values): def resource_delete(context, resource_id, soft_delete=True): - resource = resource_get(context, resource_id) + resource = resource_get(context, resource_id, project_safe=False) if resource is None: return @@ -327,6 +331,7 @@ def resource_delete(context, resource_id, soft_delete=True): session.flush() +# events def event_get(context, event_id, project_safe=True): query = model_query(context, models.Event) event = query.get(event_id) @@ -382,6 +387,7 @@ def event_create(context, values): return event_ref +# jobs def job_create(context, values): job_ref = models.Job() job_ref.update(values) @@ -408,6 +414,7 @@ def job_delete(context, job_id): session.flush() +# policies def policy_get(context, policy_id, show_deleted=False): query = model_query(context, models.Policy) policy = query.get(policy_id) @@ -468,3 +475,45 @@ def policy_delete(context, policy_id): session = Session.object_session(policy) policy.soft_delete(session=session) session.flush() + + +# locks +def user_lock_acquire(user_id, engine_id): + '''Acquire lock on a user. + + :param user_id: ID of the user. + :param engine_id: ID of the engine which wants to lock the user. + :return: A user lock if success else False. + ''' + session = get_session() + session.begin() + lock = session.query(models.UserLock).get(user_id) + if lock is not None: + return False + else: + try: + lock = models.UserLock(user_id=user_id, engine_id=engine_id) + session.add(lock) + except Exception as ex: + LOG.error(_LE('Error: %s'), six.text_type(ex)) + return False + + session.commit() + return lock + + +def user_lock_release(user_id, engine_id=None): + '''Release lock on a user. + + :param user_id: ID of the user. + :return: True indicates successful release, False indicates failure. + ''' + session = get_session() + session.begin() + lock = session.query(models.UserLock).get(user_id) + if lock is None: + session.commit() + return False + session.delete(lock) + session.commit() + return True diff --git a/bilean/db/sqlalchemy/migrate_repo/versions/001_bilean_init.py b/bilean/db/sqlalchemy/migrate_repo/versions/001_bilean_init.py index 1c42ad0..4e60e90 100644 --- a/bilean/db/sqlalchemy/migrate_repo/versions/001_bilean_init.py +++ b/bilean/db/sqlalchemy/migrate_repo/versions/001_bilean_init.py @@ -109,12 +109,22 @@ def upgrade(migrate_engine): mysql_charset='utf8' ) + user_lock = sqlalchemy.Table( + 'user_lock', meta, + sqlalchemy.Column('user_id', sqlalchemy.String(36), + primary_key=True, nullable=False), + sqlalchemy.Column('engine_id', sqlalchemy.String(36)), + mysql_engine='InnoDB', + mysql_charset='utf8' + ) + tables = ( policy, user, rule, resource, event, + user_lock, ) for index, table in enumerate(tables): diff --git a/bilean/db/sqlalchemy/models.py b/bilean/db/sqlalchemy/models.py index 1bf13bc..e320e0c 100644 --- a/bilean/db/sqlalchemy/models.py +++ b/bilean/db/sqlalchemy/models.py @@ -189,3 +189,13 @@ class Job(BASE, BileanBase): engine_id = sqlalchemy.Column(sqlalchemy.String(36)) job_type = sqlalchemy.Column(sqlalchemy.String(10)) parameters = sqlalchemy.Column(types.Dict()) + + +class UserLock(BASE, BileanBase): + """User locks for engines.""" + + __tablename__ = 'user_lock' + + user_id = sqlalchemy.Column(sqlalchemy.String(36), primary_key=True, + nullable=False) + engine_id = sqlalchemy.Column(sqlalchemy.String(36)) diff --git a/bilean/engine/lock.py b/bilean/engine/lock.py new file mode 100644 index 0000000..fc0917f --- /dev/null +++ b/bilean/engine/lock.py @@ -0,0 +1,66 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import eventlet +from oslo_config import cfg +from oslo_log import log as logging + +from bilean.common.i18n import _ +from bilean.db import api as db_api + +CONF = cfg.CONF + +CONF.import_opt('lock_retry_times', 'bilean.common.config') +CONF.import_opt('lock_retry_interval', 'bilean.common.config') + +LOG = logging.getLogger(__name__) + + +def sleep(sleep_time): + '''Interface for sleeping.''' + + eventlet.sleep(sleep_time) + + +def user_lock_acquire(user_id, engine_id): + """Try to lock the specified user. + + :param user_id: ID of the user to be locked. + :param engine_id: ID of the engine which wants to lock the user. + :returns: True if lock is acquired, or False otherwise. + """ + + user_lock = db_api.user_lock_acquire(user_id, engine_id) + if user_lock: + return True + + retries = cfg.CONF.lock_retry_times + retry_interval = cfg.CONF.lock_retry_interval + + while retries > 0: + sleep(retry_interval) + LOG.debug(_('Acquire lock for user %s again'), user_id) + user_lock = db_api.user_lock_acquire(user_id, engine_id) + if user_lock: + return True + retries = retries - 1 + + return False + + +def user_lock_release(user_id, engine_id=None): + """Release the lock on the specified user. + + :param user_id: ID of the user to be released. + :param engine_id: ID of the engine which locked the user. + """ + return db_api.user_lock_release(user_id, engine_id=engine_id) diff --git a/bilean/engine/scheduler.py b/bilean/engine/scheduler.py index fe2b7b7..f055b8d 100644 --- a/bilean/engine/scheduler.py +++ b/bilean/engine/scheduler.py @@ -14,9 +14,11 @@ from bilean.common import context as bilean_context from bilean.common import exception from bilean.common.i18n import _ +from bilean.common.i18n import _LE from bilean.common.i18n import _LI from bilean.common import utils from bilean.db import api as db_api +from bilean.engine import lock as bilean_lock from bilean.engine import user as user_mod from bilean import notifier @@ -35,8 +37,8 @@ scheduler_opts = [ help=_('The time zone of job, default is utc')), cfg.IntOpt('prior_notify_time', default=3, - help=_("The days notify user before user's balance is used up, " - "default is 3 days.")), + help=_('Time in hours before notify user when the balance of ' + 'user is almost used up.')), cfg.IntOpt('misfire_grace_time', default=3600, help=_('Seconds after the designated run time that the job is ' @@ -175,26 +177,38 @@ class BileanScheduler(object): return job is not None def _notify_task(self, user_id): + res = bilean_lock.user_lock_acquire(user_id, self.engine_id) + if not res: + LOG.error(_LE('Failed grabbing the lock for user %s'), user_id) + return + admin_context = bilean_context.get_admin_context() user = user_mod.User.load(admin_context, user_id=user_id) - reason = "The balance is almost use up" - msg = {'user': user.id, 'notification': reason} - self.notifier.info('billing.notify', msg) - if user.status != user.FREEZE and user.rate > 0: + if user.notify_or_not(): user.do_bill(admin_context) + reason = "The balance is almost use up" + user.set_status(admin_context, user.WARNING, reason) + msg = {'user': user_id, 'notification': reason} + self.notifier.info('billing.notify', msg) try: db_api.job_delete( admin_context, self._generate_job_id(user.id, 'notify')) except exception.NotFound as e: LOG.warn(_("Failed in deleting job: %s") % six.text_type(e)) - user.set_status(admin_context, user.WARNING, reason) self.update_user_job(user) + bilean_lock.user_lock_release(user_id, engine_id=self.engine_id) + def _daily_task(self, user_id): + res = bilean_lock.user_lock_acquire(user_id, self.engine_id) + if not res: + LOG.error(_LE('Failed grabbing the lock for user %s'), user_id) + return + admin_context = bilean_context.get_admin_context() user = user_mod.User.load(admin_context, user_id=user_id) - if user.status != user.FREEZE and user.rate > 0: - user.do_bill(admin_context) + user.do_bill(admin_context) + try: db_api.job_delete( admin_context, self._generate_job_id(user.id, 'daily')) @@ -202,11 +216,18 @@ class BileanScheduler(object): LOG.warn(_("Failed in deleting job: %s") % six.text_type(e)) self.update_user_job(user) + bilean_lock.user_lock_release(user_id, engine_id=self.engine_id) + def _freeze_task(self, user_id): + res = bilean_lock.user_lock_acquire(user_id, self.engine_id) + if not res: + LOG.error(_LE('Failed grabbing the lock for user %s'), user_id) + return + admin_context = bilean_context.get_admin_context() user = user_mod.User.load(admin_context, user_id=user_id) - if user.status != user.FREEZE and user.rate > 0: - user.do_bill(admin_context) + user.do_bill(admin_context) + try: db_api.job_delete( admin_context, self._generate_job_id(user.id, 'freeze')) @@ -214,6 +235,8 @@ class BileanScheduler(object): LOG.warn(_("Failed in deleting job: %s") % six.text_type(e)) self.update_user_job(user) + bilean_lock.user_lock_release(user_id, engine_id=self.engine_id) + def _add_notify_job(self, user): if not user.rate: return False diff --git a/bilean/engine/service.py b/bilean/engine/service.py index 417f803..2d7e591 100644 --- a/bilean/engine/service.py +++ b/bilean/engine/service.py @@ -29,6 +29,7 @@ from bilean.common import schema from bilean.common import utils from bilean.engine import environment from bilean.engine import event as event_mod +from bilean.engine import lock as bilean_lock from bilean.engine import policy as policy_mod from bilean.engine import scheduler from bilean.engine import user as user_mod @@ -152,11 +153,19 @@ class EngineService(service.Service): @request_context def user_recharge(self, cnxt, user_id, value): """Do recharge for specify user.""" - user = user_mod.User.load(cnxt, user_id=user_id) - user.do_recharge(cnxt, value) - # As user has been updated, the billing job for the user - # should to be updated too. - self.scheduler.update_user_job(user) + res = bilean_lock.user_lock_acquire(user_id, self.engine_id) + if not res: + LOG.error(_LE('Failed grabbing the lock for user %s'), res.user_id) + return False + try: + user = user_mod.User.load(cnxt, user_id=user_id) + user.do_recharge(cnxt, value) + # As user has been updated, the billing job for the user + # should to be updated too. + self.scheduler.update_user_job(user) + finally: + bilean_lock.user_lock_release(user_id, engine_id=self.engine_id) + return user.to_dict() def user_delete(self, cnxt, user_id): @@ -182,8 +191,14 @@ class EngineService(service.Service): 'policy': policy_id} raise exception.BileanBadRequest(msg=msg) + res = bilean_lock.user_lock_acquire(user_id, self.engine_id) + if not res: + LOG.error(_LE('Failed grabbing the lock for user %s'), res.user_id) + return False user.policy_id = policy_id user.store(cnxt) + bilean_lock.user_lock_release(user_id, engine_id=self.engine_id) + return user.to_dict() @request_context @@ -294,12 +309,21 @@ class EngineService(service.Service): resource.rate = rule.get_price(resource) # Update user with resource - user.update_with_resource(admin_context, resource) - resource.store(admin_context) + res = bilean_lock.user_lock_acquire(user.id, self.engine_id) + if not res: + LOG.error(_LE('Failed grabbing the lock for user %s'), user.id) + return + try: + # Reload user to ensure the info is latest. + user = user_mod.User.load(admin_context, user_id=user_id) + user.update_with_resource(admin_context, resource) + resource.store(admin_context) - # As the rate of user has changed, the billing job for the user - # should change too. - self.scheduler.update_user_job(user) + # As the rate of user has changed, the billing job for the user + # should change too. + self.scheduler.update_user_job(user) + finally: + bilean_lock.user_lock_release(user.id, engine_id=self.engine_id) return resource.to_dict() @@ -335,26 +359,48 @@ class EngineService(service.Service): res.properties = resource['properties'] rule = rule_base.Rule.load(admin_context, rule_id=res.rule_id) res.rate = rule.get_price(res) - res.store(admin_context) res.d_rate = res.rate - old_rate - user = user_mod.User.load(admin_context, res.user_id) - user.update_with_resource(admin_context, res, action='update') + result = bilean_lock.user_lock_acquire(res.user_id, self.engine_id) + if not result: + LOG.error(_LE('Failed grabbing the lock for user %s'), res.user_id) + return False + try: + user = user_mod.User.load(admin_context, res.user_id) + user.update_with_resource(admin_context, res, action='update') - self.scheduler.update_user_job(user) + res.store(admin_context) + + self.scheduler.update_user_job(user) + finally: + bilean_lock.user_lock_release(user.id, engine_id=self.engine_id) + + return True def resource_delete(self, cnxt, resource_id): """Do resource delete""" admin_context = bilean_context.get_admin_context() - res = resource_base.Resource.load( - admin_context, resource_id=resource_id, project_safe=False) + try: + res = resource_base.Resource.load( + admin_context, resource_id=resource_id) + except exception.ResourceNotFound: + return False - user = user_mod.User.load(admin_context, user_id=res.user_id) - user.update_with_resource(admin_context, res, action='delete') + result = bilean_lock.user_lock_acquire(res.user_id, self.engine_id) + if not result: + LOG.error(_LE('Failed grabbing the lock for user %s'), res.user_id) + return False + try: + user = user_mod.User.load(admin_context, user_id=res.user_id) + user.update_with_resource(admin_context, res, action='delete') - self.scheduler.update_user_job(user) + self.scheduler.update_user_job(user) - res.delete(admin_context) + res.delete(admin_context) + finally: + bilean_lock.user_lock_release(user.id, engine_id=self.engine_id) + + return True @request_context def event_list(self, cnxt, user_id=None, limit=None, marker=None, diff --git a/bilean/engine/user.py b/bilean/engine/user.py index 95df5bb..6505b58 100644 --- a/bilean/engine/user.py +++ b/bilean/engine/user.py @@ -15,6 +15,7 @@ import six from bilean.common import exception from bilean.common.i18n import _ +from bilean.common.i18n import _LI from bilean.common import utils from bilean.db import api as db_api from bilean.drivers import base as driver_base @@ -187,18 +188,19 @@ class User(object): self.status_reason = reason self.store(context) - def update_with_resource(self, context, resource, action='create'): + def update_with_resource(self, context, resource, do_bill=True, + action='create'): '''Update user with resource''' + if do_bill: + self.do_bill(context) + if 'create' == action: d_rate = resource.rate - if self.rate > 0: - self.do_bill(context) elif 'delete' == action: - self.do_bill(context) d_rate = -resource.rate elif 'update' == action: - self.do_bill(context) d_rate = resource.d_rate + self._change_user_rate(context, d_rate) self.store(context) @@ -211,13 +213,16 @@ class User(object): if d_rate > 0 and self.status == self.FREE: self.status = self.ACTIVE elif d_rate < 0: - if new_rate == 0 and self.balance > 0: + if new_rate == 0 and self.balance >= 0: self.status = self.FREE + elif new_rate == 0 and self.balance < 0: + self.status = self.FREEZE elif self.status == self.WARNING: - p_time = cfg.CONF.scheduler.prior_notify_time * 3600 - rest_usage = p_time * new_rate - if self.balance > rest_usage: + if not self.notify_or_not(): + reason = _("Status change from 'warning' to 'active' " + "because of resource deleting.") self.status = self.ACTIVE + self.status_reason = reason self.rate = new_rate def do_recharge(self, context, value): @@ -232,23 +237,19 @@ class User(object): "of recharge.") self.set_status(context, self.FREE, reason=reason) elif self.status == self.WARNING: - prior_notify_time = cfg.CONF.scheduler.prior_notify_time * 3600 - rest_usage = prior_notify_time * self.rate - if self.balance > rest_usage: - reason = _("Status change from warning to active because " + if not self.notify_or_not(): + reason = _("Status change from 'warning' to 'active' because " "of recharge.") self.set_status(context, self.ACTIVE, reason=reason) event_mod.record(context, self.id, action='recharge', value=value) - def _freeze(self, context, reason=None): - '''Freeze user when balance overdraft.''' - LOG.info(_("Freeze user %(user_id), reason: %(reason)s"), - {'user_id': self.id, 'reason': reason}) - resources = resource_base.Resource.load_all( - context, user_id=self.id, project_safe=False) - for resource in resources: - resource.do_delete() - self.set_status(context, self.FREEZE, reason) + def notify_or_not(self): + '''Check if user should be notified.''' + prior_notify_time = cfg.CONF.scheduler.prior_notify_time * 3600 + rest_usage = prior_notify_time * self.rate + if self.balance > rest_usage: + return False + return True def do_delete(self, context): db_api.user_delete(context, self.id) @@ -256,13 +257,29 @@ class User(object): def do_bill(self, context): '''Do bill once, pay the cost until now.''' + if self.status not in [self.ACTIVE, self.WARNING]: + LOG.info(_LI("Ignore bill action because user is in '%s' " + "status."), self.status) + return + now = timeutils.utcnow() total_seconds = (now - self.last_bill).total_seconds() - self.balance = self.balance - self.rate * total_seconds - self.last_bill = now - if self.balance <= 0: - self._freeze(context, reason="Balance overdraft") - self.store(context) - event_mod.record(context, self.id, - action='charge', - seconds=total_seconds) + cost = self.rate * total_seconds + if cost > 0: + self.balance -= cost + self.last_bill = now + if self.balance <= 0: + self._freeze(context, reason="Balance overdraft") + self.store(context) + event_mod.record(context, self.id, action='charge', + seconds=total_seconds) + + def _freeze(self, context, reason=None): + '''Freeze user when balance overdraft.''' + LOG.info(_LI("Freeze user %(user_id)s, reason: %(reason)s"), + {'user_id': self.id, 'reason': reason}) + resources = resource_base.Resource.load_all( + context, user_id=self.id, project_safe=False) + for resource in resources: + if resource.do_delete(context): + self._change_user_rate(context, -resource.rate) diff --git a/bilean/resources/base.py b/bilean/resources/base.py index 8511768..b7c5e3e 100644 --- a/bilean/resources/base.py +++ b/bilean/resources/base.py @@ -96,6 +96,8 @@ class Resource(object): def load(cls, context, resource_id=None, resource=None, show_deleted=False, project_safe=True): '''Retrieve a resource from database.''' + if context.is_admin: + project_safe = False if resource is None: resource = db_api.resource_get(context, resource_id, show_deleted=show_deleted, @@ -145,7 +147,7 @@ class Resource(object): return NotImplemented - def do_delete(self, ignore_missing=True, timeout=None): + def do_delete(self, context, ignore_missing=True, timeout=None): '''Delete resource from other services.''' return NotImplemented diff --git a/bilean/resources/os/nova/server.py b/bilean/resources/os/nova/server.py index 0483900..81509c9 100644 --- a/bilean/resources/os/nova/server.py +++ b/bilean/resources/os/nova/server.py @@ -14,6 +14,7 @@ import six from bilean.common.i18n import _LE +from bilean.db import api as db_api from bilean.drivers import base as driver_base from bilean.resources import base @@ -35,9 +36,13 @@ class ServerResource(base.Resource): # TODO(ldb) return NotImplemented - def do_delete(self, ignore_missing=True, timeout=None): + def do_delete(self, context, ignore_missing=True, timeout=None): '''Delete resource from other services.''' + # Delete resource from db + db_api.resource_delete(context, self.id) + + # Delete resource from nova novaclient = driver_base.BileanDriver().compute() try: novaclient.server_delete(self.id, ignore_missing=ignore_missing)