Add user lock.

Change-Id: Ia4d2069b910cdaba3b8666275d0290636d773f82
This commit is contained in:
lvdongbing 2016-03-22 02:10:08 -04:00
parent df6044d409
commit c4546c1d5c
11 changed files with 313 additions and 64 deletions

View File

@ -54,6 +54,12 @@ engine_opts = [
cfg.IntOpt('default_action_timeout',
default=3600,
help=_('Timeout in seconds for actions.')),
cfg.IntOpt('lock_retry_times',
default=50,
help=_('Number of times trying to grab a lock.')),
cfg.IntOpt('lock_retry_interval',
default=1,
help=_('Number of seconds between lock retries.')),
]
rpc_opts = [

View File

@ -41,6 +41,7 @@ def db_version(engine):
return IMPL.db_version(engine)
# users
def user_get(context, user_id, show_deleted=False, project_safe=True):
return IMPL.user_get(context, user_id,
show_deleted=show_deleted,
@ -68,6 +69,7 @@ def user_get_all(context, show_deleted=False, limit=None,
filters=filters)
# rules
def rule_get(context, rule_id, show_deleted=False):
return IMPL.rule_get(context, rule_id, show_deleted=False)
@ -93,6 +95,7 @@ def rule_delete(context, rule_id):
return IMPL.rule_delete(context, rule_id)
# resources
def resource_get(context, resource_id, show_deleted=False, project_safe=True):
return IMPL.resource_get(context, resource_id,
show_deleted=show_deleted,
@ -121,6 +124,7 @@ def resource_delete(context, resource_id):
IMPL.resource_delete(context, resource_id)
# events
def event_get(context, event_id, project_safe=True):
return IMPL.event_get(context, event_id, project_safe=project_safe)
@ -145,6 +149,7 @@ def event_delete(context, event_id):
return IMPL.event_delete(context, event_id)
# jobs
def job_create(context, values):
return IMPL.job_create(context, values)
@ -157,6 +162,7 @@ def job_delete(context, job_id):
return IMPL.job_delete(context, job_id)
# policies
def policy_get(context, policy_id, show_deleted=False):
return IMPL.policy_get(context, policy_id, show_deleted=False)
@ -178,3 +184,12 @@ def policy_update(context, policy_id, values):
def policy_delete(context, policy_id):
return IMPL.policy_delete(context, policy_id)
# locks
def user_lock_acquire(user_id, engine_id):
return IMPL.user_lock_acquire(user_id, engine_id)
def user_lock_release(user_id, engine_id=None):
return IMPL.user_lock_release(user_id, engine_id=engine_id)

View File

@ -24,6 +24,7 @@ from sqlalchemy.orm.session import Session
from bilean.common import consts
from bilean.common import exception
from bilean.common.i18n import _LE
from bilean.db.sqlalchemy import filters as db_filters
from bilean.db.sqlalchemy import migration
from bilean.db.sqlalchemy import models
@ -115,6 +116,7 @@ def db_version(engine):
return migration.db_version(engine)
# users
def user_get(context, user_id, show_deleted=False, project_safe=True):
query = model_query(context, models.User)
user = query.get(user_id)
@ -189,6 +191,7 @@ def user_get_all(context, show_deleted=False, limit=None,
default_sort_keys=['id']).all()
# rules
def rule_get(context, rule_id, show_deleted=False):
query = model_query(context, models.Rule)
rule = query.filter_by(id=rule_id).first()
@ -252,6 +255,7 @@ def rule_delete(context, rule_id):
session.flush()
# resources
def resource_get(context, resource_id, show_deleted=False, project_safe=True):
query = model_query(context, models.Resource)
resource = query.get(resource_id)
@ -314,7 +318,7 @@ def resource_update(context, resource_id, values):
def resource_delete(context, resource_id, soft_delete=True):
resource = resource_get(context, resource_id)
resource = resource_get(context, resource_id, project_safe=False)
if resource is None:
return
@ -327,6 +331,7 @@ def resource_delete(context, resource_id, soft_delete=True):
session.flush()
# events
def event_get(context, event_id, project_safe=True):
query = model_query(context, models.Event)
event = query.get(event_id)
@ -382,6 +387,7 @@ def event_create(context, values):
return event_ref
# jobs
def job_create(context, values):
job_ref = models.Job()
job_ref.update(values)
@ -408,6 +414,7 @@ def job_delete(context, job_id):
session.flush()
# policies
def policy_get(context, policy_id, show_deleted=False):
query = model_query(context, models.Policy)
policy = query.get(policy_id)
@ -468,3 +475,45 @@ def policy_delete(context, policy_id):
session = Session.object_session(policy)
policy.soft_delete(session=session)
session.flush()
# locks
def user_lock_acquire(user_id, engine_id):
'''Acquire lock on a user.
:param user_id: ID of the user.
:param engine_id: ID of the engine which wants to lock the user.
:return: A user lock if success else False.
'''
session = get_session()
session.begin()
lock = session.query(models.UserLock).get(user_id)
if lock is not None:
return False
else:
try:
lock = models.UserLock(user_id=user_id, engine_id=engine_id)
session.add(lock)
except Exception as ex:
LOG.error(_LE('Error: %s'), six.text_type(ex))
return False
session.commit()
return lock
def user_lock_release(user_id, engine_id=None):
'''Release lock on a user.
:param user_id: ID of the user.
:return: True indicates successful release, False indicates failure.
'''
session = get_session()
session.begin()
lock = session.query(models.UserLock).get(user_id)
if lock is None:
session.commit()
return False
session.delete(lock)
session.commit()
return True

View File

@ -109,12 +109,22 @@ def upgrade(migrate_engine):
mysql_charset='utf8'
)
user_lock = sqlalchemy.Table(
'user_lock', meta,
sqlalchemy.Column('user_id', sqlalchemy.String(36),
primary_key=True, nullable=False),
sqlalchemy.Column('engine_id', sqlalchemy.String(36)),
mysql_engine='InnoDB',
mysql_charset='utf8'
)
tables = (
policy,
user,
rule,
resource,
event,
user_lock,
)
for index, table in enumerate(tables):

View File

@ -189,3 +189,13 @@ class Job(BASE, BileanBase):
engine_id = sqlalchemy.Column(sqlalchemy.String(36))
job_type = sqlalchemy.Column(sqlalchemy.String(10))
parameters = sqlalchemy.Column(types.Dict())
class UserLock(BASE, BileanBase):
"""User locks for engines."""
__tablename__ = 'user_lock'
user_id = sqlalchemy.Column(sqlalchemy.String(36), primary_key=True,
nullable=False)
engine_id = sqlalchemy.Column(sqlalchemy.String(36))

66
bilean/engine/lock.py Normal file
View File

@ -0,0 +1,66 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import eventlet
from oslo_config import cfg
from oslo_log import log as logging
from bilean.common.i18n import _
from bilean.db import api as db_api
CONF = cfg.CONF
CONF.import_opt('lock_retry_times', 'bilean.common.config')
CONF.import_opt('lock_retry_interval', 'bilean.common.config')
LOG = logging.getLogger(__name__)
def sleep(sleep_time):
'''Interface for sleeping.'''
eventlet.sleep(sleep_time)
def user_lock_acquire(user_id, engine_id):
"""Try to lock the specified user.
:param user_id: ID of the user to be locked.
:param engine_id: ID of the engine which wants to lock the user.
:returns: True if lock is acquired, or False otherwise.
"""
user_lock = db_api.user_lock_acquire(user_id, engine_id)
if user_lock:
return True
retries = cfg.CONF.lock_retry_times
retry_interval = cfg.CONF.lock_retry_interval
while retries > 0:
sleep(retry_interval)
LOG.debug(_('Acquire lock for user %s again'), user_id)
user_lock = db_api.user_lock_acquire(user_id, engine_id)
if user_lock:
return True
retries = retries - 1
return False
def user_lock_release(user_id, engine_id=None):
"""Release the lock on the specified user.
:param user_id: ID of the user to be released.
:param engine_id: ID of the engine which locked the user.
"""
return db_api.user_lock_release(user_id, engine_id=engine_id)

View File

@ -14,9 +14,11 @@
from bilean.common import context as bilean_context
from bilean.common import exception
from bilean.common.i18n import _
from bilean.common.i18n import _LE
from bilean.common.i18n import _LI
from bilean.common import utils
from bilean.db import api as db_api
from bilean.engine import lock as bilean_lock
from bilean.engine import user as user_mod
from bilean import notifier
@ -35,8 +37,8 @@ scheduler_opts = [
help=_('The time zone of job, default is utc')),
cfg.IntOpt('prior_notify_time',
default=3,
help=_("The days notify user before user's balance is used up, "
"default is 3 days.")),
help=_('Time in hours before notify user when the balance of '
'user is almost used up.')),
cfg.IntOpt('misfire_grace_time',
default=3600,
help=_('Seconds after the designated run time that the job is '
@ -175,26 +177,38 @@ class BileanScheduler(object):
return job is not None
def _notify_task(self, user_id):
res = bilean_lock.user_lock_acquire(user_id, self.engine_id)
if not res:
LOG.error(_LE('Failed grabbing the lock for user %s'), user_id)
return
admin_context = bilean_context.get_admin_context()
user = user_mod.User.load(admin_context, user_id=user_id)
reason = "The balance is almost use up"
msg = {'user': user.id, 'notification': reason}
self.notifier.info('billing.notify', msg)
if user.status != user.FREEZE and user.rate > 0:
if user.notify_or_not():
user.do_bill(admin_context)
reason = "The balance is almost use up"
user.set_status(admin_context, user.WARNING, reason)
msg = {'user': user_id, 'notification': reason}
self.notifier.info('billing.notify', msg)
try:
db_api.job_delete(
admin_context, self._generate_job_id(user.id, 'notify'))
except exception.NotFound as e:
LOG.warn(_("Failed in deleting job: %s") % six.text_type(e))
user.set_status(admin_context, user.WARNING, reason)
self.update_user_job(user)
bilean_lock.user_lock_release(user_id, engine_id=self.engine_id)
def _daily_task(self, user_id):
res = bilean_lock.user_lock_acquire(user_id, self.engine_id)
if not res:
LOG.error(_LE('Failed grabbing the lock for user %s'), user_id)
return
admin_context = bilean_context.get_admin_context()
user = user_mod.User.load(admin_context, user_id=user_id)
if user.status != user.FREEZE and user.rate > 0:
user.do_bill(admin_context)
try:
db_api.job_delete(
admin_context, self._generate_job_id(user.id, 'daily'))
@ -202,11 +216,18 @@ class BileanScheduler(object):
LOG.warn(_("Failed in deleting job: %s") % six.text_type(e))
self.update_user_job(user)
bilean_lock.user_lock_release(user_id, engine_id=self.engine_id)
def _freeze_task(self, user_id):
res = bilean_lock.user_lock_acquire(user_id, self.engine_id)
if not res:
LOG.error(_LE('Failed grabbing the lock for user %s'), user_id)
return
admin_context = bilean_context.get_admin_context()
user = user_mod.User.load(admin_context, user_id=user_id)
if user.status != user.FREEZE and user.rate > 0:
user.do_bill(admin_context)
try:
db_api.job_delete(
admin_context, self._generate_job_id(user.id, 'freeze'))
@ -214,6 +235,8 @@ class BileanScheduler(object):
LOG.warn(_("Failed in deleting job: %s") % six.text_type(e))
self.update_user_job(user)
bilean_lock.user_lock_release(user_id, engine_id=self.engine_id)
def _add_notify_job(self, user):
if not user.rate:
return False

View File

@ -29,6 +29,7 @@ from bilean.common import schema
from bilean.common import utils
from bilean.engine import environment
from bilean.engine import event as event_mod
from bilean.engine import lock as bilean_lock
from bilean.engine import policy as policy_mod
from bilean.engine import scheduler
from bilean.engine import user as user_mod
@ -152,11 +153,19 @@ class EngineService(service.Service):
@request_context
def user_recharge(self, cnxt, user_id, value):
"""Do recharge for specify user."""
res = bilean_lock.user_lock_acquire(user_id, self.engine_id)
if not res:
LOG.error(_LE('Failed grabbing the lock for user %s'), res.user_id)
return False
try:
user = user_mod.User.load(cnxt, user_id=user_id)
user.do_recharge(cnxt, value)
# As user has been updated, the billing job for the user
# should to be updated too.
self.scheduler.update_user_job(user)
finally:
bilean_lock.user_lock_release(user_id, engine_id=self.engine_id)
return user.to_dict()
def user_delete(self, cnxt, user_id):
@ -182,8 +191,14 @@ class EngineService(service.Service):
'policy': policy_id}
raise exception.BileanBadRequest(msg=msg)
res = bilean_lock.user_lock_acquire(user_id, self.engine_id)
if not res:
LOG.error(_LE('Failed grabbing the lock for user %s'), res.user_id)
return False
user.policy_id = policy_id
user.store(cnxt)
bilean_lock.user_lock_release(user_id, engine_id=self.engine_id)
return user.to_dict()
@request_context
@ -294,12 +309,21 @@ class EngineService(service.Service):
resource.rate = rule.get_price(resource)
# Update user with resource
res = bilean_lock.user_lock_acquire(user.id, self.engine_id)
if not res:
LOG.error(_LE('Failed grabbing the lock for user %s'), user.id)
return
try:
# Reload user to ensure the info is latest.
user = user_mod.User.load(admin_context, user_id=user_id)
user.update_with_resource(admin_context, resource)
resource.store(admin_context)
# As the rate of user has changed, the billing job for the user
# should change too.
self.scheduler.update_user_job(user)
finally:
bilean_lock.user_lock_release(user.id, engine_id=self.engine_id)
return resource.to_dict()
@ -335,26 +359,48 @@ class EngineService(service.Service):
res.properties = resource['properties']
rule = rule_base.Rule.load(admin_context, rule_id=res.rule_id)
res.rate = rule.get_price(res)
res.store(admin_context)
res.d_rate = res.rate - old_rate
result = bilean_lock.user_lock_acquire(res.user_id, self.engine_id)
if not result:
LOG.error(_LE('Failed grabbing the lock for user %s'), res.user_id)
return False
try:
user = user_mod.User.load(admin_context, res.user_id)
user.update_with_resource(admin_context, res, action='update')
res.store(admin_context)
self.scheduler.update_user_job(user)
finally:
bilean_lock.user_lock_release(user.id, engine_id=self.engine_id)
return True
def resource_delete(self, cnxt, resource_id):
"""Do resource delete"""
admin_context = bilean_context.get_admin_context()
try:
res = resource_base.Resource.load(
admin_context, resource_id=resource_id, project_safe=False)
admin_context, resource_id=resource_id)
except exception.ResourceNotFound:
return False
result = bilean_lock.user_lock_acquire(res.user_id, self.engine_id)
if not result:
LOG.error(_LE('Failed grabbing the lock for user %s'), res.user_id)
return False
try:
user = user_mod.User.load(admin_context, user_id=res.user_id)
user.update_with_resource(admin_context, res, action='delete')
self.scheduler.update_user_job(user)
res.delete(admin_context)
finally:
bilean_lock.user_lock_release(user.id, engine_id=self.engine_id)
return True
@request_context
def event_list(self, cnxt, user_id=None, limit=None, marker=None,

View File

@ -15,6 +15,7 @@ import six
from bilean.common import exception
from bilean.common.i18n import _
from bilean.common.i18n import _LI
from bilean.common import utils
from bilean.db import api as db_api
from bilean.drivers import base as driver_base
@ -187,18 +188,19 @@ class User(object):
self.status_reason = reason
self.store(context)
def update_with_resource(self, context, resource, action='create'):
def update_with_resource(self, context, resource, do_bill=True,
action='create'):
'''Update user with resource'''
if do_bill:
self.do_bill(context)
if 'create' == action:
d_rate = resource.rate
if self.rate > 0:
self.do_bill(context)
elif 'delete' == action:
self.do_bill(context)
d_rate = -resource.rate
elif 'update' == action:
self.do_bill(context)
d_rate = resource.d_rate
self._change_user_rate(context, d_rate)
self.store(context)
@ -211,13 +213,16 @@ class User(object):
if d_rate > 0 and self.status == self.FREE:
self.status = self.ACTIVE
elif d_rate < 0:
if new_rate == 0 and self.balance > 0:
if new_rate == 0 and self.balance >= 0:
self.status = self.FREE
elif new_rate == 0 and self.balance < 0:
self.status = self.FREEZE
elif self.status == self.WARNING:
p_time = cfg.CONF.scheduler.prior_notify_time * 3600
rest_usage = p_time * new_rate
if self.balance > rest_usage:
if not self.notify_or_not():
reason = _("Status change from 'warning' to 'active' "
"because of resource deleting.")
self.status = self.ACTIVE
self.status_reason = reason
self.rate = new_rate
def do_recharge(self, context, value):
@ -232,23 +237,19 @@ class User(object):
"of recharge.")
self.set_status(context, self.FREE, reason=reason)
elif self.status == self.WARNING:
prior_notify_time = cfg.CONF.scheduler.prior_notify_time * 3600
rest_usage = prior_notify_time * self.rate
if self.balance > rest_usage:
reason = _("Status change from warning to active because "
if not self.notify_or_not():
reason = _("Status change from 'warning' to 'active' because "
"of recharge.")
self.set_status(context, self.ACTIVE, reason=reason)
event_mod.record(context, self.id, action='recharge', value=value)
def _freeze(self, context, reason=None):
'''Freeze user when balance overdraft.'''
LOG.info(_("Freeze user %(user_id), reason: %(reason)s"),
{'user_id': self.id, 'reason': reason})
resources = resource_base.Resource.load_all(
context, user_id=self.id, project_safe=False)
for resource in resources:
resource.do_delete()
self.set_status(context, self.FREEZE, reason)
def notify_or_not(self):
'''Check if user should be notified.'''
prior_notify_time = cfg.CONF.scheduler.prior_notify_time * 3600
rest_usage = prior_notify_time * self.rate
if self.balance > rest_usage:
return False
return True
def do_delete(self, context):
db_api.user_delete(context, self.id)
@ -256,13 +257,29 @@ class User(object):
def do_bill(self, context):
'''Do bill once, pay the cost until now.'''
if self.status not in [self.ACTIVE, self.WARNING]:
LOG.info(_LI("Ignore bill action because user is in '%s' "
"status."), self.status)
return
now = timeutils.utcnow()
total_seconds = (now - self.last_bill).total_seconds()
self.balance = self.balance - self.rate * total_seconds
cost = self.rate * total_seconds
if cost > 0:
self.balance -= cost
self.last_bill = now
if self.balance <= 0:
self._freeze(context, reason="Balance overdraft")
self.store(context)
event_mod.record(context, self.id,
action='charge',
event_mod.record(context, self.id, action='charge',
seconds=total_seconds)
def _freeze(self, context, reason=None):
'''Freeze user when balance overdraft.'''
LOG.info(_LI("Freeze user %(user_id)s, reason: %(reason)s"),
{'user_id': self.id, 'reason': reason})
resources = resource_base.Resource.load_all(
context, user_id=self.id, project_safe=False)
for resource in resources:
if resource.do_delete(context):
self._change_user_rate(context, -resource.rate)

View File

@ -96,6 +96,8 @@ class Resource(object):
def load(cls, context, resource_id=None, resource=None,
show_deleted=False, project_safe=True):
'''Retrieve a resource from database.'''
if context.is_admin:
project_safe = False
if resource is None:
resource = db_api.resource_get(context, resource_id,
show_deleted=show_deleted,
@ -145,7 +147,7 @@ class Resource(object):
return NotImplemented
def do_delete(self, ignore_missing=True, timeout=None):
def do_delete(self, context, ignore_missing=True, timeout=None):
'''Delete resource from other services.'''
return NotImplemented

View File

@ -14,6 +14,7 @@
import six
from bilean.common.i18n import _LE
from bilean.db import api as db_api
from bilean.drivers import base as driver_base
from bilean.resources import base
@ -35,9 +36,13 @@ class ServerResource(base.Resource):
# TODO(ldb)
return NotImplemented
def do_delete(self, ignore_missing=True, timeout=None):
def do_delete(self, context, ignore_missing=True, timeout=None):
'''Delete resource from other services.'''
# Delete resource from db
db_api.resource_delete(context, self.id)
# Delete resource from nova
novaclient = driver_base.BileanDriver().compute()
try:
novaclient.server_delete(self.id, ignore_missing=ignore_missing)