Separate scheduler as a service

Separate scheduler as a service to improve engine's performance.

Change-Id: I76446a08036df886c8d7e14430d08ed901901799
This commit is contained in:
lvdongbing 2016-03-25 02:57:35 -04:00
parent c4546c1d5c
commit 1782a3d694
30 changed files with 434 additions and 285 deletions

View File

@ -35,10 +35,10 @@ class PolicyData(object):
return self.data[consts.POLICY_NAME]
def rules(self):
return self.data.get(consts.POLICY_RULES, None)
return self.data.get(consts.POLICY_RULES)
def metadata(self):
return self.data.get(consts.RULE_METADATA, None)
return self.data.get(consts.RULE_METADATA)
class PolicyController(object):
@ -106,7 +106,7 @@ class PolicyController(object):
if not validator.is_valid_body(body):
raise exc.HTTPUnprocessableEntity()
policy_data = body.get('policy', None)
policy_data = body.get('policy')
if policy_data is None:
raise exc.HTTPBadRequest(_("Malformed request data, missing "
"'policy' key in request body."))
@ -123,14 +123,14 @@ class PolicyController(object):
if not validator.is_valid_body(body):
raise exc.HTTPUnprocessableEntity()
policy_data = body.get('policy', None)
policy_data = body.get('policy')
if policy_data is None:
raise exc.HTTPBadRequest(_("Malformed request data, missing "
"'policy' key in request body."))
name = policy_data.get(consts.POLICY_NAME, None)
metadata = policy_data.get(consts.POLICY_METADATA, None)
is_default = policy_data.get(consts.POLICY_IS_DEFAULT, None)
name = policy_data.get(consts.POLICY_NAME)
metadata = policy_data.get(consts.POLICY_METADATA)
is_default = policy_data.get(consts.POLICY_IS_DEFAULT)
policy = self.rpc_client.policy_update(req.context, policy_id, name,
metadata, is_default)

View File

@ -94,13 +94,14 @@ class ResourceController(object):
if not validator.is_valid_body(body):
raise exc.HTTPUnprocessableEntity()
resources = body.get('resources', None)
resources = body.get('resources')
if not resources:
msg = _("Resources is empty")
raise exc.HTTPBadRequest(explanation=msg)
if body.get('count', None):
count = body.get('count')
if count:
try:
validator.validate_integer(body.get('count'), 'count',
validator.validate_integer(count, 'count',
consts.MIN_RESOURCE_NUM,
consts.MAX_RESOURCE_NUM)
except exception.InvalidInput as e:

View File

@ -40,7 +40,7 @@ class RuleData(object):
return self.data[consts.RULE_SPEC]
def metadata(self):
return self.data.get(consts.RULE_METADATA, None)
return self.data.get(consts.RULE_METADATA)
class RuleController(object):
@ -103,7 +103,7 @@ class RuleController(object):
if not validator.is_valid_body(body):
raise exc.HTTPUnprocessableEntity()
rule_data = body.get('rule', None)
rule_data = body.get('rule')
if rule_data is None:
raise exc.HTTPBadRequest(_("Malformed request data, missing "
"'rule' key in request body."))

View File

@ -101,14 +101,14 @@ class UserController(object):
raise exc.HTTPBadRequest(msg)
if action == self.ATTACH_POLICY:
policy = body.get(action).get('policy', None)
policy = body.get(action).get('policy')
if policy is None:
raise exc.HTTPBadRequest(_("Malformed request data, no policy "
"specified to attach."))
user = self.rpc_client.user_attach_policy(
req.context, user_id, policy)
elif action == self.RECHARGE:
value = body.get(action).get('value', None)
value = body.get(action).get('value')
if value is None:
raise exc.HTTPBadRequest(_("Malformed request data, missing "
"'value' key in request body."))

18
bin/bilean-api → bilean/cmd/api.py Executable file → Normal file
View File

@ -13,21 +13,16 @@
# under the License.
"""
Bilean API Server. An OpenStack ReST API to Bilean
Bilean API Server.
An OpenStack ReST API to Bilean
"""
import eventlet
eventlet.monkey_patch(os=False)
import os
import sys
possible_topdir = os.path.normpath(os.path.join(os.path.abspath(sys.argv[0]),
os.pardir,
os.pardir))
if os.path.exists(os.path.join(possible_topdir, 'bilean', '__init__.py')):
sys.path.insert(0, possible_topdir)
from bilean.common import config
from bilean.common.i18n import _LI
from bilean.common import messaging
@ -35,19 +30,18 @@ from bilean.common import wsgi
from bilean import version
from oslo_config import cfg
from oslo_i18n import _lazy
import oslo_i18n as i18n
from oslo_log import log as logging
from oslo_service import systemd
import six
_lazy.enable_lazy()
i18n.enable_lazy()
LOG = logging.getLogger('bilean.api')
if __name__ == "__main__":
def main():
try:
logging.register_options(cfg.CONF)
cfg.CONF(project='bilean', prog='bilean-api',

17
bin/bilean-engine → bilean/cmd/engine.py Executable file → Normal file
View File

@ -13,25 +13,12 @@
# under the License.
"""
Bilean Engine Server. This does the work of actually implementing the API
calls made by the user. Normal communications is done via the bilean API
which then calls into this engine.
Bilean Engine Server.
"""
import eventlet
eventlet.monkey_patch()
import os
import sys
# If ../bilean/__init__.py exists, add ../ to Python search path, so that
# it will override what happens to be installed in /usr/(local/)lib/python...
POSSIBLE_TOPDIR = os.path.normpath(os.path.join(os.path.abspath(sys.argv[0]),
os.pardir,
os.pardir))
if os.path.exists(os.path.join(POSSIBLE_TOPDIR, 'bilean', '__init__.py')):
sys.path.insert(0, POSSIBLE_TOPDIR)
from bilean.common import consts
from bilean.common import messaging
@ -45,7 +32,7 @@ _lazy.enable_lazy()
LOG = logging.getLogger('bilean.engine')
if __name__ == '__main__':
def main():
logging.register_options(cfg.CONF)
cfg.CONF(project='bilean', prog='bilean-engine')
logging.setup(cfg.CONF, 'bilean-engine')

14
bin/bilean-notification → bilean/cmd/notification.py Executable file → Normal file
View File

@ -15,23 +15,11 @@
import eventlet
eventlet.monkey_patch()
import os
import sys
# If ../bilean/__init__.py exists, add ../ to Python search path, so that
# it will override what happens to be installed in /usr/(local/)lib/python...
POSSIBLE_TOPDIR = os.path.normpath(os.path.join(os.path.abspath(sys.argv[0]),
os.pardir,
os.pardir))
if os.path.exists(os.path.join(POSSIBLE_TOPDIR, 'bilean', '__init__.py')):
sys.path.insert(0, POSSIBLE_TOPDIR)
from oslo_config import cfg
from oslo_i18n import _lazy
from oslo_log import log as logging
from oslo_service import service
from bilean.common import config
from bilean.common import messaging
_lazy.enable_lazy()
@ -39,7 +27,7 @@ _lazy.enable_lazy()
LOG = logging.getLogger('bilean.notification')
if __name__ == "__main__":
def main():
logging.register_options(cfg.CONF)
cfg.CONF(project='bilean', prog='bilean-notification')
logging.setup(cfg.CONF, 'bilean-notification')

46
bilean/cmd/scheduler.py Normal file
View File

@ -0,0 +1,46 @@
#!/usr/bin/env python
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Bilean Scheduler Server.
"""
import eventlet
eventlet.monkey_patch()
from bilean.common import consts
from bilean.common import messaging
from oslo_config import cfg
from oslo_i18n import _lazy
from oslo_log import log as logging
from oslo_service import service
_lazy.enable_lazy()
LOG = logging.getLogger('bilean.scheduler')
def main():
logging.register_options(cfg.CONF)
cfg.CONF(project='bilean', prog='bilean-scheduler')
logging.setup(cfg.CONF, 'bilean-scheduler')
logging.set_defaults()
messaging.setup()
from bilean.scheduler import service as scheduler
srv = scheduler.SchedulerService(cfg.CONF.host, consts.SCHEDULER_TOPIC)
launcher = service.launch(cfg.CONF, srv)
launcher.wait()

View File

@ -19,14 +19,14 @@ MAX_RESOURCE_NUM = 1000
RPC_ATTRs = (
ENGINE_TOPIC,
ENGINE_HEALTH_MGR_TOPIC,
SCHEDULER_TOPIC,
NOTIFICATION_TOPICS,
RPC_API_VERSION,
) = (
'bilean-engine',
'engine-health_mgr',
'bilean-scheduler',
'billing_notifications',
'1.0',
'1.1',
)
RPC_PARAMS = (

View File

@ -154,8 +154,8 @@ def job_create(context, values):
return IMPL.job_create(context, values)
def job_get_all(context, engine_id=None):
return IMPL.job_get_all(context, engine_id=engine_id)
def job_get_all(context, scheduler_id=None):
return IMPL.job_get_all(context, scheduler_id=scheduler_id)
def job_delete(context, job_id):

View File

@ -395,10 +395,10 @@ def job_create(context, values):
return job_ref
def job_get_all(context, engine_id=None):
def job_get_all(context, scheduler_id=None):
query = model_query(context, models.Job)
if engine_id:
query = query.filter_by(engine_id=engine_id)
if scheduler_id:
query = query.filter_by(scheduler_id=scheduler_id)
return query.all()

View File

@ -28,7 +28,7 @@ def upgrade(migrate_engine):
'job', meta,
sqlalchemy.Column('id', sqlalchemy.String(50),
primary_key=True, nullable=False),
sqlalchemy.Column('engine_id', sqlalchemy.String(36),
sqlalchemy.Column('scheduler_id', sqlalchemy.String(36),
nullable=False),
sqlalchemy.Column('job_type', sqlalchemy.String(10),
nullable=False),

View File

@ -186,7 +186,7 @@ class Job(BASE, BileanBase):
id = sqlalchemy.Column(sqlalchemy.String(50), primary_key=True,
unique=True)
engine_id = sqlalchemy.Column(sqlalchemy.String(36))
scheduler_id = sqlalchemy.Column(sqlalchemy.String(36))
job_type = sqlalchemy.Column(sqlalchemy.String(10))
parameters = sqlalchemy.Column(types.Dict())

View File

@ -54,7 +54,7 @@ def parse_exception(ex):
if data:
code = data.get('code', code)
message = data.get('message', message)
error = data.get('error', None)
error = data.get('error')
if error:
code = data.get('code', code)
message = data['error'].get('message', message)
@ -95,7 +95,7 @@ def create_connection(params=None):
if params is None:
params = {}
if params.get('token', None):
if params.get('token'):
auth_plugin = 'token'
else:
auth_plugin = 'password'

View File

@ -30,9 +30,9 @@ class Event(object):
def __init__(self, timestamp, **kwargs):
self.timestamp = timestamp
self.user_id = kwargs.get('user_id', None)
self.action = kwargs.get('action', None)
self.resource_type = kwargs.get('resource_type', None)
self.user_id = kwargs.get('user_id')
self.action = kwargs.get('action')
self.resource_type = kwargs.get('resource_type')
self.value = kwargs.get('value', 0)
@classmethod
@ -82,6 +82,7 @@ class Event(object):
def store(self, context):
'''Store the event into database and return its ID.'''
values = {
'timestamp': self.timestamp,
'user_id': self.user_id,
'action': self.action,
'resource_type': self.resource_type,
@ -125,13 +126,21 @@ def record(context, user_id, action=None, seconds=0, value=0):
if action == 'charge':
resources = resource_base.Resource.load_all(
context, user_id=user_id, project_safe=False)
res_mapping = {}
for resource in resources:
usage = resource.rate * seconds
if res_mapping.get(resource.resource_type) is None:
res_mapping[resource.resource_type] = usage
else:
res_mapping[resource.resource_type] += usage
for res_type in res_mapping.keys():
event = Event(timeutils.utcnow(),
user_id=user_id,
action=action,
resource_type=resource.resource_type,
value=usage)
resource_type=res_type,
value=res_mapping.get(res_type))
event.store(context)
elif action == 'recharge':
event = Event(timeutils.utcnow(),

View File

@ -23,15 +23,15 @@ class Policy(object):
def __init__(self, name, **kwargs):
self.name = name
self.id = kwargs.get('id', None)
self.id = kwargs.get('id')
self.is_default = kwargs.get('is_default', False)
# rules schema like [{'id': 'xxx', 'type': 'os.nova.server'}]
self.rules = kwargs.get('rules', [])
self.metadata = kwargs.get('metadata', None)
self.metadata = kwargs.get('metadata')
self.created_at = kwargs.get('created_at', None)
self.updated_at = kwargs.get('updated_at', None)
self.deleted_at = kwargs.get('deleted_at', None)
self.created_at = kwargs.get('created_at')
self.updated_at = kwargs.get('updated_at')
self.deleted_at = kwargs.get('deleted_at')
def store(self, context):
"""Store the policy record into database table."""

View File

@ -19,6 +19,7 @@ from oslo_log import log as logging
import oslo_messaging
from oslo_service import service
from bilean.common import consts
from bilean.common import context as bilean_context
from bilean.common import exception
from bilean.common.i18n import _
@ -31,10 +32,10 @@ from bilean.engine import environment
from bilean.engine import event as event_mod
from bilean.engine import lock as bilean_lock
from bilean.engine import policy as policy_mod
from bilean.engine import scheduler
from bilean.engine import user as user_mod
from bilean.resources import base as resource_base
from bilean.rules import base as rule_base
from bilean import scheduler as bilean_scheduler
LOG = logging.getLogger(__name__)
@ -63,14 +64,11 @@ class EngineService(service.Service):
by the RPC caller.
"""
RPC_API_VERSION = '1.1'
def __init__(self, host, topic, manager=None, context=None):
super(EngineService, self).__init__()
self.host = host
self.topic = topic
self.scheduler = None
self.engine_id = None
self.target = None
self._rpc_server = None
@ -78,18 +76,8 @@ class EngineService(service.Service):
def start(self):
self.engine_id = socket.gethostname()
LOG.info(_LI("Initialise bilean users from keystone."))
admin_context = bilean_context.get_admin_context()
user_mod.User.init_users(admin_context)
self.scheduler = scheduler.BileanScheduler(engine_id=self.engine_id)
LOG.info(_LI("Starting billing scheduler for engine: %s"),
self.engine_id)
self.scheduler.init_scheduler()
self.scheduler.start()
LOG.info(_LI("Starting rpc server for engine: %s"), self.engine_id)
target = oslo_messaging.Target(version=self.RPC_API_VERSION,
target = oslo_messaging.Target(version=consts.RPC_API_VERSION,
server=self.host,
topic=self.topic)
self.target = target
@ -111,11 +99,6 @@ class EngineService(service.Service):
def stop(self):
self._stop_rpc_server()
LOG.info(_LI("Stopping billing scheduler for engine: %s"),
self.engine_id)
self.scheduler.stop()
super(EngineService, self).stop()
@request_context
@ -162,7 +145,8 @@ class EngineService(service.Service):
user.do_recharge(cnxt, value)
# As user has been updated, the billing job for the user
# should to be updated too.
self.scheduler.update_user_job(user)
bilean_scheduler.notify(bilean_scheduler.UPDATE_JOBS,
user=user.to_dict())
finally:
bilean_lock.user_lock_release(user_id, engine_id=self.engine_id)
@ -176,7 +160,8 @@ class EngineService(service.Service):
LOG.error(_LE("User (%s) is in use, can not delete."), user_id)
return
user_mod.User.delete(cnxt, user_id=user_id)
self.scheduler.delete_user_jobs(user)
bilean_scheduler.notify(bilean_scheduler.DELETE_JOBS,
user=user.to_dict())
@request_context
def user_attach_policy(self, cnxt, user_id, policy_id):
@ -321,7 +306,8 @@ class EngineService(service.Service):
# As the rate of user has changed, the billing job for the user
# should change too.
self.scheduler.update_user_job(user)
bilean_scheduler.notify(bilean_scheduler.UPDATE_JOBS,
user=user.to_dict())
finally:
bilean_lock.user_lock_release(user.id, engine_id=self.engine_id)
@ -371,7 +357,8 @@ class EngineService(service.Service):
res.store(admin_context)
self.scheduler.update_user_job(user)
bilean_scheduler.notify(bilean_scheduler.UPDATE_JOBS,
user=user.to_dict())
finally:
bilean_lock.user_lock_release(user.id, engine_id=self.engine_id)
@ -394,7 +381,8 @@ class EngineService(service.Service):
user = user_mod.User.load(admin_context, user_id=res.user_id)
user.update_with_resource(admin_context, res, action='delete')
self.scheduler.update_user_job(user)
bilean_scheduler.notify(bilean_scheduler.UPDATE_JOBS,
user=user.to_dict())
res.delete(admin_context)
finally:
@ -563,3 +551,16 @@ class EngineService(service.Service):
def policy_delete(self, cnxt, policy_id):
LOG.info(_LI("Deleting policy: '%s'."), policy_id)
policy_mod.Policy.delete(cnxt, policy_id)
def settle_account(self, cnxt, user_id, task=None):
res = bilean_lock.user_lock_acquire(user_id, self.engine_id)
if not res:
LOG.error(_LE('Failed grabbing the lock for user %s'), user_id)
return
try:
user = user_mod.User.load(cnxt, user_id=user_id)
user.settle_account(cnxt, task=task)
finally:
bilean_lock.user_lock_release(user_id, engine_id=self.engine_id)
return user.to_dict()

View File

@ -20,6 +20,7 @@ from bilean.common import utils
from bilean.db import api as db_api
from bilean.drivers import base as driver_base
from bilean.engine import event as event_mod
from bilean import notifier as bilean_notifier
from bilean.resources import base as resource_base
from oslo_config import cfg
@ -40,18 +41,18 @@ class User(object):
def __init__(self, user_id, **kwargs):
self.id = user_id
self.policy_id = kwargs.get('policy_id', None)
self.policy_id = kwargs.get('policy_id')
self.balance = kwargs.get('balance', 0)
self.rate = kwargs.get('rate', 0.0)
self.credit = kwargs.get('credit', 0)
self.last_bill = kwargs.get('last_bill', None)
self.last_bill = kwargs.get('last_bill')
self.status = kwargs.get('status', self.INIT)
self.status_reason = kwargs.get('status_reason', 'Init user')
self.created_at = kwargs.get('created_at', None)
self.updated_at = kwargs.get('updated_at', None)
self.deleted_at = kwargs.get('deleted_at', None)
self.created_at = kwargs.get('created_at')
self.updated_at = kwargs.get('updated_at')
self.deleted_at = kwargs.get('deleted_at')
def store(self, context):
"""Store the user record into database table."""
@ -97,7 +98,8 @@ class User(object):
user = cls(pid, status=cls.INIT,
status_reason='Init from keystone')
user.store(context)
return True
users.append(user)
return users
@classmethod
def _from_db_record(cls, record):
@ -165,6 +167,10 @@ class User(object):
return True
return False
@classmethod
def from_dict(cls, values):
return cls(values.get('id'), **values)
def to_dict(self):
user_dict = {
'id': self.id,
@ -188,11 +194,9 @@ class User(object):
self.status_reason = reason
self.store(context)
def update_with_resource(self, context, resource, do_bill=True,
action='create'):
def update_with_resource(self, context, resource, action='create'):
'''Update user with resource'''
if do_bill:
self.do_bill(context)
self._settle_account(context)
if 'create' == action:
d_rate = resource.rate
@ -217,6 +221,7 @@ class User(object):
self.status = self.FREE
elif new_rate == 0 and self.balance < 0:
self.status = self.FREEZE
self.satus_reason = "Balance overdraft"
elif self.status == self.WARNING:
if not self.notify_or_not():
reason = _("Status change from 'warning' to 'active' "
@ -228,23 +233,32 @@ class User(object):
def do_recharge(self, context, value):
'''Do recharge for user.'''
if self.rate > 0 and self.status != self.FREEZE:
self.do_bill(context)
self._settle_account(context)
self.balance += value
if self.status == self.INIT and self.balance > 0:
self.set_status(context, self.FREE, reason='Recharged')
self.status = self.FREE
self.status_reason = "Recharged"
elif self.status == self.FREEZE and self.balance > 0:
reason = _("Status change from 'freeze' to 'free' because "
"of recharge.")
self.set_status(context, self.FREE, reason=reason)
self.status = self.FREE
self.status_reason = reason
elif self.status == self.WARNING:
if not self.notify_or_not():
reason = _("Status change from 'warning' to 'active' because "
"of recharge.")
self.set_status(context, self.ACTIVE, reason=reason)
self.status = self.ACTIVE
self.status_reason = reason
self.store(context)
event_mod.record(context, self.id, action='recharge', value=value)
def notify_or_not(self):
'''Check if user should be notified.'''
cfg.CONF.import_opt('prior_notify_time',
'bilean.scheduler.cron_scheduler',
group='scheduler')
prior_notify_time = cfg.CONF.scheduler.prior_notify_time * 3600
rest_usage = prior_notify_time * self.rate
if self.balance > rest_usage:
@ -255,10 +269,9 @@ class User(object):
db_api.user_delete(context, self.id)
return True
def do_bill(self, context):
'''Do bill once, pay the cost until now.'''
def _settle_account(self, context):
if self.status not in [self.ACTIVE, self.WARNING]:
LOG.info(_LI("Ignore bill action because user is in '%s' "
LOG.info(_LI("Ignore settlement action because user is in '%s' "
"status."), self.status)
return
@ -268,9 +281,6 @@ class User(object):
if cost > 0:
self.balance -= cost
self.last_bill = now
if self.balance <= 0:
self._freeze(context, reason="Balance overdraft")
self.store(context)
event_mod.record(context, self.id, action='charge',
seconds=total_seconds)
@ -283,3 +293,21 @@ class User(object):
for resource in resources:
if resource.do_delete(context):
self._change_user_rate(context, -resource.rate)
def settle_account(self, context, task=None):
'''Settle account for user.'''
notifier = bilean_notifier.Notifier()
self._settle_account(context)
if task == 'notify' and self.notify_or_not():
self.status_reason = "The balance is almost used up"
self.status = self.WARNING
# Notify user
msg = {'user': self.id, 'notification': self.status_reason}
notifier.info('billing.notify', msg)
elif task == 'freeze' and self.balance <= 0:
self._freeze(context, reason="Balance overdraft")
msg = {'user': self.id, 'notification': self.status_reason}
notifier.info('billing.notify', msg)
self.store(context)

View File

@ -54,9 +54,9 @@ class ResourceAction(Action):
def __init__(self, cnxt, action, data):
super(ResourceAction, self).__init__(cnxt, action, data)
self.id = data.get('resource_ref', None)
self.user_id = data.get('user_id', None)
self.resource_type = data.get('resource_type', None)
self.id = data.get('resource_ref')
self.user_id = data.get('user_id')
self.resource_type = data.get('resource_type')
self.properties = {}
self._parse_and_validate()

View File

@ -51,7 +51,7 @@ class EventsNotificationEndpoint(object):
def process_identity_notification(self, notification):
"""Convert notification to user."""
user_id = notification['payload'].get('resource_info', None)
user_id = notification['payload'].get('resource_info')
if not user_id:
LOG.error(_LE("Cannot retrieve user_id from notification: %s"),
notification)

View File

@ -40,13 +40,13 @@ class Resource(object):
self.resource_type = resource_type
self.properties = properties
self.rule_id = kwargs.get('rule_id', None)
self.rule_id = kwargs.get('rule_id')
self.rate = kwargs.get('rate', 0)
self.d_rate = 0
self.created_at = kwargs.get('created_at', None)
self.updated_at = kwargs.get('updated_at', None)
self.deleted_at = kwargs.get('deleted_at', None)
self.created_at = kwargs.get('created_at')
self.updated_at = kwargs.get('updated_at')
self.deleted_at = kwargs.get('deleted_at')
def store(self, context):
"""Store the resource record into database table."""

View File

@ -29,6 +29,7 @@ class EngineClient(object):
BASE_RPC_API_VERSION = '1.0'
def __init__(self):
cfg.CONF.import_opt('host', 'bilean.common.config')
self._client = messaging.get_rpc_client(
topic=consts.ENGINE_TOPIC,
server=cfg.CONF.host,
@ -54,6 +55,7 @@ class EngineClient(object):
client = self._client
return client.cast(ctxt, method, **kwargs)
# users
def user_list(self, ctxt, show_deleted=False, limit=None,
marker=None, sort_keys=None, sort_dir=None,
filters=None):
@ -88,6 +90,7 @@ class EngineClient(object):
user_id=user_id,
policy_id=policy_id))
# rules
def rule_list(self, ctxt, limit=None, marker=None, sort_keys=None,
sort_dir=None, filters=None, show_deleted=False):
return self.call(ctxt, self.make_msg('rule_list', limit=limit,
@ -113,6 +116,7 @@ class EngineClient(object):
return self.call(ctxt, self.make_msg('rule_delete',
rule_id=rule_id))
# resources
def resource_list(self, ctxt, user_id=None, limit=None, marker=None,
sort_keys=None, sort_dir=None, filters=None,
project_safe=True, show_deleted=False):
@ -144,6 +148,7 @@ class EngineClient(object):
return self.call(ctxt, self.make_msg('resource_delete',
resource_id=resource_id))
# events
def event_list(self, ctxt, user_id=None, limit=None, marker=None,
sort_keys=None, sort_dir=None, filters=None,
start_time=None, end_time=None, project_safe=True,
@ -162,6 +167,7 @@ class EngineClient(object):
return self.call(cnxt, self.make_msg('validate_creation',
resources=resources))
# policies
def policy_list(self, ctxt, limit=None, marker=None, sort_keys=None,
sort_dir=None, filters=None, show_deleted=False):
return self.call(ctxt, self.make_msg('policy_list', limit=limit,
@ -197,3 +203,8 @@ class EngineClient(object):
return self.call(ctxt, self.make_msg('policy_add_rules',
policy_id=policy_id,
rules=rules))
def settle_account(self, ctxt, user_id, task=None):
return self.call(ctxt, self.make_msg('settle_account',
user_id=user_id,
task=task))

View File

@ -80,14 +80,14 @@ class Rule(object):
self.name = name
self.spec = spec
self.id = kwargs.get('id', None)
self.id = kwargs.get('id')
self.type = kwargs.get('type', '%s-%s' % (type_name, version))
self.metadata = kwargs.get('metadata', {})
self.created_at = kwargs.get('created_at', None)
self.updated_at = kwargs.get('updated_at', None)
self.deleted_at = kwargs.get('deleted_at', None)
self.created_at = kwargs.get('created_at')
self.updated_at = kwargs.get('updated_at')
self.deleted_at = kwargs.get('deleted_at')
self.spec_data = schema.Spec(self.spec_schema, self.spec)
self.properties = schema.Spec(self.properties_schema,

View File

@ -73,7 +73,7 @@ class ServerRule(base.Rule):
:param: resource: Resource object to find price.
'''
flavor = resource.properties.get('flavor', None)
flavor = resource.properties.get('flavor')
if not flavor:
raise exception.Error(msg='Flavor should be provided to get '
'the price of server.')

View File

@ -0,0 +1,51 @@
#
# Copyright 2012, Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from bilean.common import consts
from bilean.common import messaging
from oslo_context import context as oslo_context
import oslo_messaging
supported_actions = (
UPDATE_JOBS, DELETE_JOBS,
) = (
'update_jobs', 'delete_jobs',
)
def notify(method, scheduler_id=None, **kwargs):
'''Send notification to scheduler
:param method: remote method to call
:param scheduler_id: specify scheduler to notify; None implies broadcast
'''
if scheduler_id:
# Notify specific scheduler identified by scheduler_id
client = messaging.get_rpc_client(
topic=consts.SCHEDULER_TOPIC,
server=scheduler_id,
version=consts.RPC_API_VERSION)
else:
# Broadcast to all schedulers
client = messaging.get_rpc_client(
topic=consts.SCHEDULER_TOPIC,
version=consts.RPC_API_VERSION)
try:
client.call(oslo_context.get_current(), method, **kwargs)
return True
except oslo_messaging.MessagingTimeout:
return False

View File

@ -14,13 +14,11 @@
from bilean.common import context as bilean_context
from bilean.common import exception
from bilean.common.i18n import _
from bilean.common.i18n import _LE
from bilean.common.i18n import _LI
from bilean.common import utils
from bilean.db import api as db_api
from bilean.engine import lock as bilean_lock
from bilean.engine import user as user_mod
from bilean import notifier
from bilean.rpc import client as rpc_client
from oslo_config import cfg
from oslo_log import log as logging
@ -61,8 +59,8 @@ cfg.CONF.register_opts(scheduler_opts, group=scheduler_group)
LOG = logging.getLogger(__name__)
class BileanScheduler(object):
"""Billing scheduler based on apscheduler"""
class CronScheduler(object):
"""Cron scheduler based on apscheduler"""
job_types = (
NOTIFY, DAILY, FREEZE,
@ -72,39 +70,50 @@ class BileanScheduler(object):
trigger_types = (DATE, CRON) = ('date', 'cron')
def __init__(self, **kwargs):
super(BileanScheduler, self).__init__()
super(CronScheduler, self).__init__()
self._scheduler = BackgroundScheduler()
self.notifier = notifier.Notifier()
self.engine_id = kwargs.get('engine_id', None)
self.scheduler_id = kwargs.get('scheduler_id')
self.rpc_client = rpc_client.EngineClient()
if cfg.CONF.scheduler.store_ap_job:
self._scheduler.add_jobstore(cfg.CONF.scheduler.backend,
url=cfg.CONF.scheduler.connection)
def start(self):
LOG.info(_('Starting Cron scheduler'))
self._scheduler.start()
def stop(self):
LOG.info(_('Stopping Cron scheduler'))
self._scheduler.shutdown()
def init_scheduler(self):
"""Init all jobs related to the engine from db."""
admin_context = bilean_context.get_admin_context()
jobs = [] or db_api.job_get_all(admin_context,
engine_id=self.engine_id)
scheduler_id=self.scheduler_id)
for job in jobs:
if self.is_exist(job.id):
if self._is_exist(job.id):
continue
task_name = "_%s_task" % (job.job_type)
task = getattr(self, task_name)
LOG.info(_LI("Add job '%(job_id)s' to engine '%(engine_id)s'."),
{'job_id': job.id, 'engine_id': self.engine_id})
LOG.info(_LI("Add job '%(job_id)s' to scheduler '%(id)s'."),
{'job_id': job.id, 'id': self.scheduler_id})
tg_type = self.CRON if job.job_type == self.DAILY else self.DAILY
self.add_job(task, job.id, trigger_type=tg_type,
self._add_job(task, job.id, trigger_type=tg_type,
params=job.parameters)
LOG.info(_LI("Initialise users from keystone."))
users = user_mod.User.init_users(admin_context)
# Init daily job for all users
users = user_mod.User.load_all(admin_context)
if users:
for user in users:
job_id = self._generate_job_id(user.id, self.DAILY)
if self.is_exist(job_id):
if self._is_exist(job_id):
continue
self._add_daily_job(user)
def add_job(self, task, job_id, trigger_type='date', **kwargs):
def _add_job(self, task, job_id, trigger_type='date', **kwargs):
"""Add a job to scheduler by given data.
:param str|unicode user_id: used as job_id
@ -128,8 +137,8 @@ class BileanScheduler(object):
return
# Add a cron type job
hour = kwargs.get('hour', None)
minute = kwargs.get('minute', None)
hour = kwargs.get('hour')
minute = kwargs.get('minute')
if not hour or not minute:
hour, minute = self._generate_timer()
self._scheduler.add_job(task, 'cron',
@ -140,7 +149,7 @@ class BileanScheduler(object):
id=job_id,
misfire_grace_time=mg_time)
def modify_job(self, job_id, **changes):
def _modify_job(self, job_id, **changes):
"""Modifies the properties of a single job.
Modifications are passed to this method as extra keyword arguments.
@ -150,7 +159,7 @@ class BileanScheduler(object):
self._scheduler.modify_job(job_id, **changes)
def remove_job(self, job_id):
def _remove_job(self, job_id):
"""Removes a job, preventing it from being run any more.
:param str|unicode job_id: the identifier of the job
@ -158,15 +167,7 @@ class BileanScheduler(object):
self._scheduler.remove_job(job_id)
def start(self):
LOG.info(_('Starting Billing scheduler'))
self._scheduler.start()
def stop(self):
LOG.info(_('Stopping Billing scheduler'))
self._scheduler.shutdown()
def is_exist(self, job_id):
def _is_exist(self, job_id):
"""Returns if the Job exists that matches the given ``job_id``.
:param str|unicode job_id: the identifier of the job
@ -177,65 +178,36 @@ class BileanScheduler(object):
return job is not None
def _notify_task(self, user_id):
res = bilean_lock.user_lock_acquire(user_id, self.engine_id)
if not res:
LOG.error(_LE('Failed grabbing the lock for user %s'), user_id)
return
admin_context = bilean_context.get_admin_context()
user = user_mod.User.load(admin_context, user_id=user_id)
if user.notify_or_not():
user.do_bill(admin_context)
reason = "The balance is almost use up"
user.set_status(admin_context, user.WARNING, reason)
msg = {'user': user_id, 'notification': reason}
self.notifier.info('billing.notify', msg)
user = self.rpc_client.settle_account(
admin_context, user_id, task=self.NOTIFY)
user_obj = user_mod.User.from_dict(user)
try:
db_api.job_delete(
admin_context, self._generate_job_id(user.id, 'notify'))
admin_context, self._generate_job_id(user_id, self.NOTIFY))
except exception.NotFound as e:
LOG.warn(_("Failed in deleting job: %s") % six.text_type(e))
self.update_user_job(user)
bilean_lock.user_lock_release(user_id, engine_id=self.engine_id)
self.update_jobs(user_obj)
def _daily_task(self, user_id):
res = bilean_lock.user_lock_acquire(user_id, self.engine_id)
if not res:
LOG.error(_LE('Failed grabbing the lock for user %s'), user_id)
return
admin_context = bilean_context.get_admin_context()
user = user_mod.User.load(admin_context, user_id=user_id)
user.do_bill(admin_context)
try:
db_api.job_delete(
admin_context, self._generate_job_id(user.id, 'daily'))
except exception.NotFound as e:
LOG.warn(_("Failed in deleting job: %s") % six.text_type(e))
self.update_user_job(user)
bilean_lock.user_lock_release(user_id, engine_id=self.engine_id)
user = self.rpc_client.settle_account(
admin_context, user_id, task=self.DAILY)
user_obj = user_mod.User.from_dict(user)
self.update_jobs(user_obj)
def _freeze_task(self, user_id):
res = bilean_lock.user_lock_acquire(user_id, self.engine_id)
if not res:
LOG.error(_LE('Failed grabbing the lock for user %s'), user_id)
return
admin_context = bilean_context.get_admin_context()
user = user_mod.User.load(admin_context, user_id=user_id)
user.do_bill(admin_context)
user = self.rpc_client.settle_account(
admin_context, user_id, task=self.FREEZE)
user_obj = user_mod.User.from_dict(user)
try:
db_api.job_delete(
admin_context, self._generate_job_id(user.id, 'freeze'))
admin_context, self._generate_job_id(user_id, self.FREEZE))
except exception.NotFound as e:
LOG.warn(_("Failed in deleting job: %s") % six.text_type(e))
self.update_user_job(user)
bilean_lock.user_lock_release(user_id, engine_id=self.engine_id)
self.update_jobs(user_obj)
def _add_notify_job(self, user):
if not user.rate:
@ -247,11 +219,11 @@ class BileanScheduler(object):
run_date = timeutils.utcnow() + timedelta(seconds=notify_seconds)
job_params = {'run_date': run_date}
job_id = self._generate_job_id(user.id, self.NOTIFY)
self.add_job(self._notify_task, job_id, **job_params)
self._add_job(self._notify_task, job_id, **job_params)
# Save job to database
job = {'id': job_id,
'job_type': self.NOTIFY,
'engine_id': self.engine_id,
'scheduler_id': self.scheduler_id,
'parameters': {'run_date': utils.format_time(run_date)}}
admin_context = bilean_context.get_admin_context()
db_api.job_create(admin_context, job)
@ -263,11 +235,11 @@ class BileanScheduler(object):
run_date = timeutils.utcnow() + timedelta(seconds=total_seconds)
job_params = {'run_date': run_date}
job_id = self._generate_job_id(user.id, self.FREEZE)
self.add_job(self._freeze_task, job_id, **job_params)
self._add_job(self._freeze_task, job_id, **job_params)
# Save job to database
job = {'id': job_id,
'job_type': self.FREEZE,
'engine_id': self.engine_id,
'scheduler_id': self.scheduler_id,
'parameters': {'run_date': utils.format_time(run_date)}}
admin_context = bilean_context.get_admin_context()
db_api.job_create(admin_context, job)
@ -277,47 +249,17 @@ class BileanScheduler(object):
job_id = self._generate_job_id(user.id, self.DAILY)
job_params = {'hour': random.randint(0, 23),
'minute': random.randint(0, 59)}
self.add_job(self._daily_task, job_id, trigger_type='cron',
**job_params)
self._add_job(self._daily_task, job_id,
trigger_type='cron', **job_params)
# Save job to database
job = {'id': job_id,
'job_type': self.DAILY,
'engine_id': self.engine_id,
'scheduler_id': self.scheduler_id,
'parameters': job_params}
admin_context = bilean_context.get_admin_context()
db_api.job_create(admin_context, job)
return True
def update_user_job(self, user):
"""Update user's billing job"""
# Delete all jobs except daily job
admin_context = bilean_context.get_admin_context()
for job_type in self.NOTIFY, self.FREEZE:
job_id = self._generate_job_id(user.id, job_type)
try:
if self.is_exist(job_id):
self.remove_job(job_id)
db_api.job_delete(admin_context, job_id)
except Exception as e:
LOG.warn(_("Failed in deleting job: %s") % six.text_type(e))
if user.status == user.ACTIVE:
self._add_notify_job(user)
elif user.status == user.WARNING:
self._add_freeze_job(user)
def delete_user_jobs(self, user):
"""Delete all jobs related the specific user."""
admin_context = bilean_context.get_admin_context()
for job_type in self.job_types:
job_id = self._generate_job_id(user.id, job_type)
try:
if self.is_exist(job_id):
self.remove_job(job_id)
db_api.job_delete(admin_context, job_id)
except Exception as e:
LOG.warn(_("Failed in deleting job: %s") % six.text_type(e))
def _generate_timer(self):
"""Generate a random timer include hour and minute."""
hour = random.randint(0, 23)
@ -328,6 +270,36 @@ class BileanScheduler(object):
"""Generate job id by given user_id and job type"""
return "%s-%s" % (job_type, user_id)
def update_jobs(self, user):
"""Update user's billing job"""
# Delete all jobs except daily job
admin_context = bilean_context.get_admin_context()
for job_type in self.NOTIFY, self.FREEZE:
job_id = self._generate_job_id(user.id, job_type)
try:
if self._is_exist(job_id):
self._remove_job(job_id)
db_api.job_delete(admin_context, job_id)
except Exception as e:
LOG.warn(_("Failed in deleting job: %s") % six.text_type(e))
if user.status == user.ACTIVE:
self._add_notify_job(user)
elif user.status == user.WARNING:
self._add_freeze_job(user)
def delete_jobs(self, user):
"""Delete all jobs related the specific user."""
admin_context = bilean_context.get_admin_context()
for job_type in self.job_types:
job_id = self._generate_job_id(user.id, job_type)
try:
if self._is_exist(job_id):
self._remove_job(job_id)
db_api.job_delete(admin_context, job_id)
except Exception as e:
LOG.warn(_("Failed in deleting job: %s") % six.text_type(e))
def list_opts():
yield scheduler_group.name, scheduler_opts

View File

@ -0,0 +1,86 @@
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import six
import socket
from oslo_log import log as logging
import oslo_messaging
from oslo_service import service
from bilean.common import consts
from bilean.common.i18n import _LE
from bilean.common.i18n import _LI
from bilean.common import messaging as rpc_messaging
from bilean.engine import user as user_mod
from bilean.scheduler import cron_scheduler
LOG = logging.getLogger(__name__)
class SchedulerService(service.Service):
def __init__(self, host, topic, manager=None, context=None):
super(SchedulerService, self).__init__()
self.host = host
self.topic = topic
self.scheduler_id = None
self.scheduler = None
self.target = None
self._rpc_server = None
def start(self):
self.scheduler_id = socket.gethostname()
self.scheduler = cron_scheduler.CronScheduler(
scheduler_id=self.scheduler_id)
LOG.info(_LI("Starting billing scheduler"))
self.scheduler.init_scheduler()
self.scheduler.start()
LOG.info(_LI("Starting rpc server for bilean scheduler service"))
self.target = oslo_messaging.Target(version=consts.RPC_API_VERSION,
server=self.scheduler_id,
topic=self.topic)
self._rpc_server = rpc_messaging.get_rpc_server(self.target, self)
self._rpc_server.start()
super(SchedulerService, self).start()
def _stop_rpc_server(self):
# Stop RPC connection to prevent new requests
LOG.info(_LI("Stopping scheduler service..."))
try:
self._rpc_server.stop()
self._rpc_server.wait()
LOG.info(_LI('Scheduler service stopped successfully'))
except Exception as ex:
LOG.error(_LE('Failed to stop scheduler service: %s'),
six.text_type(ex))
def stop(self):
self._stop_rpc_server()
LOG.info(_LI("Stopping billing scheduler"))
self.scheduler.stop()
super(SchedulerService, self).stop()
def update_jobs(self, ctxt, user):
user_obj = user_mod.User.from_dict(user)
self.scheduler.update_jobs(user_obj)
def delete_jobs(self, ctxt, user):
user_obj = user_mod.User.from_dict(user)
self.scheduler.delete_jobs(user_obj)

View File

@ -1,27 +0,0 @@
#!/usr/bin/env python
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import sys
POSSIBLE_TOPDIR = os.path.normpath(os.path.join(os.path.abspath(sys.argv[0]),
os.pardir,
os.pardir))
if os.path.exists(os.path.join(POSSIBLE_TOPDIR, 'bilean', '__init__.py')):
sys.path.insert(0, POSSIBLE_TOPDIR)
from bilean.cmd import manage
manage.main()

View File

@ -1,11 +1,10 @@
[metadata]
name = bilean
version = 2015.2
summary = OpenStack Billing Service
description-file =
README.rst
author = OpenStack
author-email = dongbing.lv@kylin-cloud.com
author-email = openstack-dev@lists.openstack.org
home-page = http://www.openstack.org/
classifier =
Environment :: OpenStack
@ -16,23 +15,26 @@ classifier =
Programming Language :: Python
Programming Language :: Python :: 2
Programming Language :: Python :: 2.7
Programming Language :: Python :: 2.6
Programming Language :: Python :: 3
Programming Language :: Python :: 3.4
[files]
packages =
bilean
scripts =
bin/bilean-api
bin/bilean-engine
bin/bilean-notification
bin/bilean-manage
[entry_points]
console_scripts =
bilean-api = bilean.cmd.api:main
bilean-engine = bilean.cmd.engine:main
bilean-scheduler = bilean.cmd.scheduler:main
bilean-notification = bilean.cmd.notification:main
bilean-manage = bilean.cmd.manage:main
oslo.config.opts =
bilean.common.config = bilean.common.config:list_opts
bilean.common.wsgi = bilean.common.wsgi:list_opts
bilean.api.middleware.ssl = bilean.api.middleware.ssl:list_opts
bilean.engine.scheduler = bilean.engine.scheduler:list_opts
bilean.scheduler.cron_scheduler = bilean.scheduler.cron_scheduler:list_opts
bilean.notification.converter = bilean.notification.converter:list_opts
bilean.drivers =

View File

@ -34,7 +34,7 @@ commands = {posargs}
commands = python setup.py build_sphinx
[flake8]
ignore = E731
ignore = E731, E402
exclude = .venv,.git,.tox,cover,dist,*lib/python*,*egg,tools,build
max-complexity=20