From 1782a3d6943dda79c3f6a20cff17c1ac066ddcde Mon Sep 17 00:00:00 2001 From: lvdongbing Date: Fri, 25 Mar 2016 02:57:35 -0400 Subject: [PATCH] Separate scheduler as a service Separate scheduler as a service to improve engine's performance. Change-Id: I76446a08036df886c8d7e14430d08ed901901799 --- bilean/api/openstack/v1/policies.py | 14 +- bilean/api/openstack/v1/resources.py | 7 +- bilean/api/openstack/v1/rules.py | 4 +- bilean/api/openstack/v1/users.py | 4 +- bin/bilean-api => bilean/cmd/api.py | 18 +- bin/bilean-engine => bilean/cmd/engine.py | 17 +- .../cmd/notification.py | 14 +- bilean/cmd/scheduler.py | 46 ++++ bilean/common/consts.py | 6 +- bilean/db/api.py | 4 +- bilean/db/sqlalchemy/api.py | 6 +- .../versions/002_add_job_table.py | 2 +- bilean/db/sqlalchemy/models.py | 2 +- bilean/drivers/openstack/sdk.py | 4 +- bilean/engine/event.py | 19 +- bilean/engine/policy.py | 10 +- bilean/engine/service.py | 51 ++--- bilean/engine/user.py | 68 ++++-- bilean/notification/action.py | 6 +- bilean/notification/endpoint.py | 2 +- bilean/resources/base.py | 8 +- bilean/rpc/client.py | 11 + bilean/rules/base.py | 8 +- bilean/rules/os/nova/server.py | 2 +- bilean/scheduler/__init__.py | 51 +++++ .../cron_scheduler.py} | 200 ++++++++---------- bilean/scheduler/service.py | 86 ++++++++ bin/bilean-manage | 27 --- setup.cfg | 20 +- tox.ini | 2 +- 30 files changed, 434 insertions(+), 285 deletions(-) rename bin/bilean-api => bilean/cmd/api.py (80%) mode change 100755 => 100644 rename bin/bilean-engine => bilean/cmd/engine.py (66%) mode change 100755 => 100644 rename bin/bilean-notification => bilean/cmd/notification.py (70%) mode change 100755 => 100644 create mode 100644 bilean/cmd/scheduler.py create mode 100644 bilean/scheduler/__init__.py rename bilean/{engine/scheduler.py => scheduler/cron_scheduler.py} (71%) create mode 100644 bilean/scheduler/service.py delete mode 100755 bin/bilean-manage diff --git a/bilean/api/openstack/v1/policies.py b/bilean/api/openstack/v1/policies.py index da74a5b..353295a 100644 --- a/bilean/api/openstack/v1/policies.py +++ b/bilean/api/openstack/v1/policies.py @@ -35,10 +35,10 @@ class PolicyData(object): return self.data[consts.POLICY_NAME] def rules(self): - return self.data.get(consts.POLICY_RULES, None) + return self.data.get(consts.POLICY_RULES) def metadata(self): - return self.data.get(consts.RULE_METADATA, None) + return self.data.get(consts.RULE_METADATA) class PolicyController(object): @@ -106,7 +106,7 @@ class PolicyController(object): if not validator.is_valid_body(body): raise exc.HTTPUnprocessableEntity() - policy_data = body.get('policy', None) + policy_data = body.get('policy') if policy_data is None: raise exc.HTTPBadRequest(_("Malformed request data, missing " "'policy' key in request body.")) @@ -123,14 +123,14 @@ class PolicyController(object): if not validator.is_valid_body(body): raise exc.HTTPUnprocessableEntity() - policy_data = body.get('policy', None) + policy_data = body.get('policy') if policy_data is None: raise exc.HTTPBadRequest(_("Malformed request data, missing " "'policy' key in request body.")) - name = policy_data.get(consts.POLICY_NAME, None) - metadata = policy_data.get(consts.POLICY_METADATA, None) - is_default = policy_data.get(consts.POLICY_IS_DEFAULT, None) + name = policy_data.get(consts.POLICY_NAME) + metadata = policy_data.get(consts.POLICY_METADATA) + is_default = policy_data.get(consts.POLICY_IS_DEFAULT) policy = self.rpc_client.policy_update(req.context, policy_id, name, metadata, is_default) diff --git a/bilean/api/openstack/v1/resources.py b/bilean/api/openstack/v1/resources.py index 0c15391..1e40372 100644 --- a/bilean/api/openstack/v1/resources.py +++ b/bilean/api/openstack/v1/resources.py @@ -94,13 +94,14 @@ class ResourceController(object): if not validator.is_valid_body(body): raise exc.HTTPUnprocessableEntity() - resources = body.get('resources', None) + resources = body.get('resources') if not resources: msg = _("Resources is empty") raise exc.HTTPBadRequest(explanation=msg) - if body.get('count', None): + count = body.get('count') + if count: try: - validator.validate_integer(body.get('count'), 'count', + validator.validate_integer(count, 'count', consts.MIN_RESOURCE_NUM, consts.MAX_RESOURCE_NUM) except exception.InvalidInput as e: diff --git a/bilean/api/openstack/v1/rules.py b/bilean/api/openstack/v1/rules.py index bf48638..ffd60de 100644 --- a/bilean/api/openstack/v1/rules.py +++ b/bilean/api/openstack/v1/rules.py @@ -40,7 +40,7 @@ class RuleData(object): return self.data[consts.RULE_SPEC] def metadata(self): - return self.data.get(consts.RULE_METADATA, None) + return self.data.get(consts.RULE_METADATA) class RuleController(object): @@ -103,7 +103,7 @@ class RuleController(object): if not validator.is_valid_body(body): raise exc.HTTPUnprocessableEntity() - rule_data = body.get('rule', None) + rule_data = body.get('rule') if rule_data is None: raise exc.HTTPBadRequest(_("Malformed request data, missing " "'rule' key in request body.")) diff --git a/bilean/api/openstack/v1/users.py b/bilean/api/openstack/v1/users.py index 33499bc..83cd368 100644 --- a/bilean/api/openstack/v1/users.py +++ b/bilean/api/openstack/v1/users.py @@ -101,14 +101,14 @@ class UserController(object): raise exc.HTTPBadRequest(msg) if action == self.ATTACH_POLICY: - policy = body.get(action).get('policy', None) + policy = body.get(action).get('policy') if policy is None: raise exc.HTTPBadRequest(_("Malformed request data, no policy " "specified to attach.")) user = self.rpc_client.user_attach_policy( req.context, user_id, policy) elif action == self.RECHARGE: - value = body.get(action).get('value', None) + value = body.get(action).get('value') if value is None: raise exc.HTTPBadRequest(_("Malformed request data, missing " "'value' key in request body.")) diff --git a/bin/bilean-api b/bilean/cmd/api.py old mode 100755 new mode 100644 similarity index 80% rename from bin/bilean-api rename to bilean/cmd/api.py index d6ba6c2..804b285 --- a/bin/bilean-api +++ b/bilean/cmd/api.py @@ -13,21 +13,16 @@ # under the License. """ -Bilean API Server. An OpenStack ReST API to Bilean +Bilean API Server. + +An OpenStack ReST API to Bilean """ import eventlet eventlet.monkey_patch(os=False) -import os import sys -possible_topdir = os.path.normpath(os.path.join(os.path.abspath(sys.argv[0]), - os.pardir, - os.pardir)) -if os.path.exists(os.path.join(possible_topdir, 'bilean', '__init__.py')): - sys.path.insert(0, possible_topdir) - from bilean.common import config from bilean.common.i18n import _LI from bilean.common import messaging @@ -35,19 +30,18 @@ from bilean.common import wsgi from bilean import version from oslo_config import cfg -from oslo_i18n import _lazy +import oslo_i18n as i18n from oslo_log import log as logging from oslo_service import systemd - import six -_lazy.enable_lazy() +i18n.enable_lazy() LOG = logging.getLogger('bilean.api') -if __name__ == "__main__": +def main(): try: logging.register_options(cfg.CONF) cfg.CONF(project='bilean', prog='bilean-api', diff --git a/bin/bilean-engine b/bilean/cmd/engine.py old mode 100755 new mode 100644 similarity index 66% rename from bin/bilean-engine rename to bilean/cmd/engine.py index e7f9989..918b76e --- a/bin/bilean-engine +++ b/bilean/cmd/engine.py @@ -13,25 +13,12 @@ # under the License. """ -Bilean Engine Server. This does the work of actually implementing the API -calls made by the user. Normal communications is done via the bilean API -which then calls into this engine. +Bilean Engine Server. """ import eventlet eventlet.monkey_patch() -import os -import sys - -# If ../bilean/__init__.py exists, add ../ to Python search path, so that -# it will override what happens to be installed in /usr/(local/)lib/python... -POSSIBLE_TOPDIR = os.path.normpath(os.path.join(os.path.abspath(sys.argv[0]), - os.pardir, - os.pardir)) -if os.path.exists(os.path.join(POSSIBLE_TOPDIR, 'bilean', '__init__.py')): - sys.path.insert(0, POSSIBLE_TOPDIR) - from bilean.common import consts from bilean.common import messaging @@ -45,7 +32,7 @@ _lazy.enable_lazy() LOG = logging.getLogger('bilean.engine') -if __name__ == '__main__': +def main(): logging.register_options(cfg.CONF) cfg.CONF(project='bilean', prog='bilean-engine') logging.setup(cfg.CONF, 'bilean-engine') diff --git a/bin/bilean-notification b/bilean/cmd/notification.py old mode 100755 new mode 100644 similarity index 70% rename from bin/bilean-notification rename to bilean/cmd/notification.py index c98a378..a22c4d2 --- a/bin/bilean-notification +++ b/bilean/cmd/notification.py @@ -15,23 +15,11 @@ import eventlet eventlet.monkey_patch() -import os -import sys - -# If ../bilean/__init__.py exists, add ../ to Python search path, so that -# it will override what happens to be installed in /usr/(local/)lib/python... -POSSIBLE_TOPDIR = os.path.normpath(os.path.join(os.path.abspath(sys.argv[0]), - os.pardir, - os.pardir)) -if os.path.exists(os.path.join(POSSIBLE_TOPDIR, 'bilean', '__init__.py')): - sys.path.insert(0, POSSIBLE_TOPDIR) - from oslo_config import cfg from oslo_i18n import _lazy from oslo_log import log as logging from oslo_service import service -from bilean.common import config from bilean.common import messaging _lazy.enable_lazy() @@ -39,7 +27,7 @@ _lazy.enable_lazy() LOG = logging.getLogger('bilean.notification') -if __name__ == "__main__": +def main(): logging.register_options(cfg.CONF) cfg.CONF(project='bilean', prog='bilean-notification') logging.setup(cfg.CONF, 'bilean-notification') diff --git a/bilean/cmd/scheduler.py b/bilean/cmd/scheduler.py new file mode 100644 index 0000000..729ccb7 --- /dev/null +++ b/bilean/cmd/scheduler.py @@ -0,0 +1,46 @@ +#!/usr/bin/env python +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +Bilean Scheduler Server. +""" + +import eventlet +eventlet.monkey_patch() + +from bilean.common import consts +from bilean.common import messaging + +from oslo_config import cfg +from oslo_i18n import _lazy +from oslo_log import log as logging +from oslo_service import service + +_lazy.enable_lazy() + +LOG = logging.getLogger('bilean.scheduler') + + +def main(): + logging.register_options(cfg.CONF) + cfg.CONF(project='bilean', prog='bilean-scheduler') + logging.setup(cfg.CONF, 'bilean-scheduler') + logging.set_defaults() + messaging.setup() + + from bilean.scheduler import service as scheduler + + srv = scheduler.SchedulerService(cfg.CONF.host, consts.SCHEDULER_TOPIC) + launcher = service.launch(cfg.CONF, srv) + launcher.wait() diff --git a/bilean/common/consts.py b/bilean/common/consts.py index 0070fe0..d15d97a 100644 --- a/bilean/common/consts.py +++ b/bilean/common/consts.py @@ -19,14 +19,14 @@ MAX_RESOURCE_NUM = 1000 RPC_ATTRs = ( ENGINE_TOPIC, - ENGINE_HEALTH_MGR_TOPIC, + SCHEDULER_TOPIC, NOTIFICATION_TOPICS, RPC_API_VERSION, ) = ( 'bilean-engine', - 'engine-health_mgr', + 'bilean-scheduler', 'billing_notifications', - '1.0', + '1.1', ) RPC_PARAMS = ( diff --git a/bilean/db/api.py b/bilean/db/api.py index 1c19494..c6a0f86 100644 --- a/bilean/db/api.py +++ b/bilean/db/api.py @@ -154,8 +154,8 @@ def job_create(context, values): return IMPL.job_create(context, values) -def job_get_all(context, engine_id=None): - return IMPL.job_get_all(context, engine_id=engine_id) +def job_get_all(context, scheduler_id=None): + return IMPL.job_get_all(context, scheduler_id=scheduler_id) def job_delete(context, job_id): diff --git a/bilean/db/sqlalchemy/api.py b/bilean/db/sqlalchemy/api.py index 2c0d3a5..91c8e2b 100644 --- a/bilean/db/sqlalchemy/api.py +++ b/bilean/db/sqlalchemy/api.py @@ -395,10 +395,10 @@ def job_create(context, values): return job_ref -def job_get_all(context, engine_id=None): +def job_get_all(context, scheduler_id=None): query = model_query(context, models.Job) - if engine_id: - query = query.filter_by(engine_id=engine_id) + if scheduler_id: + query = query.filter_by(scheduler_id=scheduler_id) return query.all() diff --git a/bilean/db/sqlalchemy/migrate_repo/versions/002_add_job_table.py b/bilean/db/sqlalchemy/migrate_repo/versions/002_add_job_table.py index 82f1d48..a39bc75 100644 --- a/bilean/db/sqlalchemy/migrate_repo/versions/002_add_job_table.py +++ b/bilean/db/sqlalchemy/migrate_repo/versions/002_add_job_table.py @@ -28,7 +28,7 @@ def upgrade(migrate_engine): 'job', meta, sqlalchemy.Column('id', sqlalchemy.String(50), primary_key=True, nullable=False), - sqlalchemy.Column('engine_id', sqlalchemy.String(36), + sqlalchemy.Column('scheduler_id', sqlalchemy.String(36), nullable=False), sqlalchemy.Column('job_type', sqlalchemy.String(10), nullable=False), diff --git a/bilean/db/sqlalchemy/models.py b/bilean/db/sqlalchemy/models.py index e320e0c..7b1c74f 100644 --- a/bilean/db/sqlalchemy/models.py +++ b/bilean/db/sqlalchemy/models.py @@ -186,7 +186,7 @@ class Job(BASE, BileanBase): id = sqlalchemy.Column(sqlalchemy.String(50), primary_key=True, unique=True) - engine_id = sqlalchemy.Column(sqlalchemy.String(36)) + scheduler_id = sqlalchemy.Column(sqlalchemy.String(36)) job_type = sqlalchemy.Column(sqlalchemy.String(10)) parameters = sqlalchemy.Column(types.Dict()) diff --git a/bilean/drivers/openstack/sdk.py b/bilean/drivers/openstack/sdk.py index cd50eec..2de6148 100644 --- a/bilean/drivers/openstack/sdk.py +++ b/bilean/drivers/openstack/sdk.py @@ -54,7 +54,7 @@ def parse_exception(ex): if data: code = data.get('code', code) message = data.get('message', message) - error = data.get('error', None) + error = data.get('error') if error: code = data.get('code', code) message = data['error'].get('message', message) @@ -95,7 +95,7 @@ def create_connection(params=None): if params is None: params = {} - if params.get('token', None): + if params.get('token'): auth_plugin = 'token' else: auth_plugin = 'password' diff --git a/bilean/engine/event.py b/bilean/engine/event.py index 648064a..b11ce7c 100644 --- a/bilean/engine/event.py +++ b/bilean/engine/event.py @@ -30,9 +30,9 @@ class Event(object): def __init__(self, timestamp, **kwargs): self.timestamp = timestamp - self.user_id = kwargs.get('user_id', None) - self.action = kwargs.get('action', None) - self.resource_type = kwargs.get('resource_type', None) + self.user_id = kwargs.get('user_id') + self.action = kwargs.get('action') + self.resource_type = kwargs.get('resource_type') self.value = kwargs.get('value', 0) @classmethod @@ -82,6 +82,7 @@ class Event(object): def store(self, context): '''Store the event into database and return its ID.''' values = { + 'timestamp': self.timestamp, 'user_id': self.user_id, 'action': self.action, 'resource_type': self.resource_type, @@ -125,13 +126,21 @@ def record(context, user_id, action=None, seconds=0, value=0): if action == 'charge': resources = resource_base.Resource.load_all( context, user_id=user_id, project_safe=False) + + res_mapping = {} for resource in resources: usage = resource.rate * seconds + if res_mapping.get(resource.resource_type) is None: + res_mapping[resource.resource_type] = usage + else: + res_mapping[resource.resource_type] += usage + + for res_type in res_mapping.keys(): event = Event(timeutils.utcnow(), user_id=user_id, action=action, - resource_type=resource.resource_type, - value=usage) + resource_type=res_type, + value=res_mapping.get(res_type)) event.store(context) elif action == 'recharge': event = Event(timeutils.utcnow(), diff --git a/bilean/engine/policy.py b/bilean/engine/policy.py index 2f891e7..56884c1 100644 --- a/bilean/engine/policy.py +++ b/bilean/engine/policy.py @@ -23,15 +23,15 @@ class Policy(object): def __init__(self, name, **kwargs): self.name = name - self.id = kwargs.get('id', None) + self.id = kwargs.get('id') self.is_default = kwargs.get('is_default', False) # rules schema like [{'id': 'xxx', 'type': 'os.nova.server'}] self.rules = kwargs.get('rules', []) - self.metadata = kwargs.get('metadata', None) + self.metadata = kwargs.get('metadata') - self.created_at = kwargs.get('created_at', None) - self.updated_at = kwargs.get('updated_at', None) - self.deleted_at = kwargs.get('deleted_at', None) + self.created_at = kwargs.get('created_at') + self.updated_at = kwargs.get('updated_at') + self.deleted_at = kwargs.get('deleted_at') def store(self, context): """Store the policy record into database table.""" diff --git a/bilean/engine/service.py b/bilean/engine/service.py index 2d7e591..78b1d71 100644 --- a/bilean/engine/service.py +++ b/bilean/engine/service.py @@ -19,6 +19,7 @@ from oslo_log import log as logging import oslo_messaging from oslo_service import service +from bilean.common import consts from bilean.common import context as bilean_context from bilean.common import exception from bilean.common.i18n import _ @@ -31,10 +32,10 @@ from bilean.engine import environment from bilean.engine import event as event_mod from bilean.engine import lock as bilean_lock from bilean.engine import policy as policy_mod -from bilean.engine import scheduler from bilean.engine import user as user_mod from bilean.resources import base as resource_base from bilean.rules import base as rule_base +from bilean import scheduler as bilean_scheduler LOG = logging.getLogger(__name__) @@ -63,14 +64,11 @@ class EngineService(service.Service): by the RPC caller. """ - RPC_API_VERSION = '1.1' - def __init__(self, host, topic, manager=None, context=None): super(EngineService, self).__init__() self.host = host self.topic = topic - self.scheduler = None self.engine_id = None self.target = None self._rpc_server = None @@ -78,18 +76,8 @@ class EngineService(service.Service): def start(self): self.engine_id = socket.gethostname() - LOG.info(_LI("Initialise bilean users from keystone.")) - admin_context = bilean_context.get_admin_context() - user_mod.User.init_users(admin_context) - - self.scheduler = scheduler.BileanScheduler(engine_id=self.engine_id) - LOG.info(_LI("Starting billing scheduler for engine: %s"), - self.engine_id) - self.scheduler.init_scheduler() - self.scheduler.start() - LOG.info(_LI("Starting rpc server for engine: %s"), self.engine_id) - target = oslo_messaging.Target(version=self.RPC_API_VERSION, + target = oslo_messaging.Target(version=consts.RPC_API_VERSION, server=self.host, topic=self.topic) self.target = target @@ -111,11 +99,6 @@ class EngineService(service.Service): def stop(self): self._stop_rpc_server() - - LOG.info(_LI("Stopping billing scheduler for engine: %s"), - self.engine_id) - self.scheduler.stop() - super(EngineService, self).stop() @request_context @@ -162,7 +145,8 @@ class EngineService(service.Service): user.do_recharge(cnxt, value) # As user has been updated, the billing job for the user # should to be updated too. - self.scheduler.update_user_job(user) + bilean_scheduler.notify(bilean_scheduler.UPDATE_JOBS, + user=user.to_dict()) finally: bilean_lock.user_lock_release(user_id, engine_id=self.engine_id) @@ -176,7 +160,8 @@ class EngineService(service.Service): LOG.error(_LE("User (%s) is in use, can not delete."), user_id) return user_mod.User.delete(cnxt, user_id=user_id) - self.scheduler.delete_user_jobs(user) + bilean_scheduler.notify(bilean_scheduler.DELETE_JOBS, + user=user.to_dict()) @request_context def user_attach_policy(self, cnxt, user_id, policy_id): @@ -321,7 +306,8 @@ class EngineService(service.Service): # As the rate of user has changed, the billing job for the user # should change too. - self.scheduler.update_user_job(user) + bilean_scheduler.notify(bilean_scheduler.UPDATE_JOBS, + user=user.to_dict()) finally: bilean_lock.user_lock_release(user.id, engine_id=self.engine_id) @@ -371,7 +357,8 @@ class EngineService(service.Service): res.store(admin_context) - self.scheduler.update_user_job(user) + bilean_scheduler.notify(bilean_scheduler.UPDATE_JOBS, + user=user.to_dict()) finally: bilean_lock.user_lock_release(user.id, engine_id=self.engine_id) @@ -394,7 +381,8 @@ class EngineService(service.Service): user = user_mod.User.load(admin_context, user_id=res.user_id) user.update_with_resource(admin_context, res, action='delete') - self.scheduler.update_user_job(user) + bilean_scheduler.notify(bilean_scheduler.UPDATE_JOBS, + user=user.to_dict()) res.delete(admin_context) finally: @@ -563,3 +551,16 @@ class EngineService(service.Service): def policy_delete(self, cnxt, policy_id): LOG.info(_LI("Deleting policy: '%s'."), policy_id) policy_mod.Policy.delete(cnxt, policy_id) + + def settle_account(self, cnxt, user_id, task=None): + res = bilean_lock.user_lock_acquire(user_id, self.engine_id) + if not res: + LOG.error(_LE('Failed grabbing the lock for user %s'), user_id) + return + try: + user = user_mod.User.load(cnxt, user_id=user_id) + user.settle_account(cnxt, task=task) + finally: + bilean_lock.user_lock_release(user_id, engine_id=self.engine_id) + + return user.to_dict() diff --git a/bilean/engine/user.py b/bilean/engine/user.py index 6505b58..d385d60 100644 --- a/bilean/engine/user.py +++ b/bilean/engine/user.py @@ -20,6 +20,7 @@ from bilean.common import utils from bilean.db import api as db_api from bilean.drivers import base as driver_base from bilean.engine import event as event_mod +from bilean import notifier as bilean_notifier from bilean.resources import base as resource_base from oslo_config import cfg @@ -40,18 +41,18 @@ class User(object): def __init__(self, user_id, **kwargs): self.id = user_id - self.policy_id = kwargs.get('policy_id', None) + self.policy_id = kwargs.get('policy_id') self.balance = kwargs.get('balance', 0) self.rate = kwargs.get('rate', 0.0) self.credit = kwargs.get('credit', 0) - self.last_bill = kwargs.get('last_bill', None) + self.last_bill = kwargs.get('last_bill') self.status = kwargs.get('status', self.INIT) self.status_reason = kwargs.get('status_reason', 'Init user') - self.created_at = kwargs.get('created_at', None) - self.updated_at = kwargs.get('updated_at', None) - self.deleted_at = kwargs.get('deleted_at', None) + self.created_at = kwargs.get('created_at') + self.updated_at = kwargs.get('updated_at') + self.deleted_at = kwargs.get('deleted_at') def store(self, context): """Store the user record into database table.""" @@ -97,7 +98,8 @@ class User(object): user = cls(pid, status=cls.INIT, status_reason='Init from keystone') user.store(context) - return True + users.append(user) + return users @classmethod def _from_db_record(cls, record): @@ -165,6 +167,10 @@ class User(object): return True return False + @classmethod + def from_dict(cls, values): + return cls(values.get('id'), **values) + def to_dict(self): user_dict = { 'id': self.id, @@ -188,11 +194,9 @@ class User(object): self.status_reason = reason self.store(context) - def update_with_resource(self, context, resource, do_bill=True, - action='create'): + def update_with_resource(self, context, resource, action='create'): '''Update user with resource''' - if do_bill: - self.do_bill(context) + self._settle_account(context) if 'create' == action: d_rate = resource.rate @@ -217,6 +221,7 @@ class User(object): self.status = self.FREE elif new_rate == 0 and self.balance < 0: self.status = self.FREEZE + self.satus_reason = "Balance overdraft" elif self.status == self.WARNING: if not self.notify_or_not(): reason = _("Status change from 'warning' to 'active' " @@ -228,23 +233,32 @@ class User(object): def do_recharge(self, context, value): '''Do recharge for user.''' if self.rate > 0 and self.status != self.FREEZE: - self.do_bill(context) + self._settle_account(context) self.balance += value + if self.status == self.INIT and self.balance > 0: - self.set_status(context, self.FREE, reason='Recharged') + self.status = self.FREE + self.status_reason = "Recharged" elif self.status == self.FREEZE and self.balance > 0: reason = _("Status change from 'freeze' to 'free' because " "of recharge.") - self.set_status(context, self.FREE, reason=reason) + self.status = self.FREE + self.status_reason = reason elif self.status == self.WARNING: if not self.notify_or_not(): reason = _("Status change from 'warning' to 'active' because " "of recharge.") - self.set_status(context, self.ACTIVE, reason=reason) + self.status = self.ACTIVE + self.status_reason = reason + + self.store(context) event_mod.record(context, self.id, action='recharge', value=value) def notify_or_not(self): '''Check if user should be notified.''' + cfg.CONF.import_opt('prior_notify_time', + 'bilean.scheduler.cron_scheduler', + group='scheduler') prior_notify_time = cfg.CONF.scheduler.prior_notify_time * 3600 rest_usage = prior_notify_time * self.rate if self.balance > rest_usage: @@ -255,10 +269,9 @@ class User(object): db_api.user_delete(context, self.id) return True - def do_bill(self, context): - '''Do bill once, pay the cost until now.''' + def _settle_account(self, context): if self.status not in [self.ACTIVE, self.WARNING]: - LOG.info(_LI("Ignore bill action because user is in '%s' " + LOG.info(_LI("Ignore settlement action because user is in '%s' " "status."), self.status) return @@ -268,9 +281,6 @@ class User(object): if cost > 0: self.balance -= cost self.last_bill = now - if self.balance <= 0: - self._freeze(context, reason="Balance overdraft") - self.store(context) event_mod.record(context, self.id, action='charge', seconds=total_seconds) @@ -283,3 +293,21 @@ class User(object): for resource in resources: if resource.do_delete(context): self._change_user_rate(context, -resource.rate) + + def settle_account(self, context, task=None): + '''Settle account for user.''' + notifier = bilean_notifier.Notifier() + self._settle_account(context) + + if task == 'notify' and self.notify_or_not(): + self.status_reason = "The balance is almost used up" + self.status = self.WARNING + # Notify user + msg = {'user': self.id, 'notification': self.status_reason} + notifier.info('billing.notify', msg) + elif task == 'freeze' and self.balance <= 0: + self._freeze(context, reason="Balance overdraft") + msg = {'user': self.id, 'notification': self.status_reason} + notifier.info('billing.notify', msg) + + self.store(context) diff --git a/bilean/notification/action.py b/bilean/notification/action.py index 2eb1af8..ffbdf18 100644 --- a/bilean/notification/action.py +++ b/bilean/notification/action.py @@ -54,9 +54,9 @@ class ResourceAction(Action): def __init__(self, cnxt, action, data): super(ResourceAction, self).__init__(cnxt, action, data) - self.id = data.get('resource_ref', None) - self.user_id = data.get('user_id', None) - self.resource_type = data.get('resource_type', None) + self.id = data.get('resource_ref') + self.user_id = data.get('user_id') + self.resource_type = data.get('resource_type') self.properties = {} self._parse_and_validate() diff --git a/bilean/notification/endpoint.py b/bilean/notification/endpoint.py index d5401bd..b484b0c 100644 --- a/bilean/notification/endpoint.py +++ b/bilean/notification/endpoint.py @@ -51,7 +51,7 @@ class EventsNotificationEndpoint(object): def process_identity_notification(self, notification): """Convert notification to user.""" - user_id = notification['payload'].get('resource_info', None) + user_id = notification['payload'].get('resource_info') if not user_id: LOG.error(_LE("Cannot retrieve user_id from notification: %s"), notification) diff --git a/bilean/resources/base.py b/bilean/resources/base.py index b7c5e3e..17fe0b8 100644 --- a/bilean/resources/base.py +++ b/bilean/resources/base.py @@ -40,13 +40,13 @@ class Resource(object): self.resource_type = resource_type self.properties = properties - self.rule_id = kwargs.get('rule_id', None) + self.rule_id = kwargs.get('rule_id') self.rate = kwargs.get('rate', 0) self.d_rate = 0 - self.created_at = kwargs.get('created_at', None) - self.updated_at = kwargs.get('updated_at', None) - self.deleted_at = kwargs.get('deleted_at', None) + self.created_at = kwargs.get('created_at') + self.updated_at = kwargs.get('updated_at') + self.deleted_at = kwargs.get('deleted_at') def store(self, context): """Store the resource record into database table.""" diff --git a/bilean/rpc/client.py b/bilean/rpc/client.py index 91667db..1184416 100644 --- a/bilean/rpc/client.py +++ b/bilean/rpc/client.py @@ -29,6 +29,7 @@ class EngineClient(object): BASE_RPC_API_VERSION = '1.0' def __init__(self): + cfg.CONF.import_opt('host', 'bilean.common.config') self._client = messaging.get_rpc_client( topic=consts.ENGINE_TOPIC, server=cfg.CONF.host, @@ -54,6 +55,7 @@ class EngineClient(object): client = self._client return client.cast(ctxt, method, **kwargs) + # users def user_list(self, ctxt, show_deleted=False, limit=None, marker=None, sort_keys=None, sort_dir=None, filters=None): @@ -88,6 +90,7 @@ class EngineClient(object): user_id=user_id, policy_id=policy_id)) + # rules def rule_list(self, ctxt, limit=None, marker=None, sort_keys=None, sort_dir=None, filters=None, show_deleted=False): return self.call(ctxt, self.make_msg('rule_list', limit=limit, @@ -113,6 +116,7 @@ class EngineClient(object): return self.call(ctxt, self.make_msg('rule_delete', rule_id=rule_id)) + # resources def resource_list(self, ctxt, user_id=None, limit=None, marker=None, sort_keys=None, sort_dir=None, filters=None, project_safe=True, show_deleted=False): @@ -144,6 +148,7 @@ class EngineClient(object): return self.call(ctxt, self.make_msg('resource_delete', resource_id=resource_id)) + # events def event_list(self, ctxt, user_id=None, limit=None, marker=None, sort_keys=None, sort_dir=None, filters=None, start_time=None, end_time=None, project_safe=True, @@ -162,6 +167,7 @@ class EngineClient(object): return self.call(cnxt, self.make_msg('validate_creation', resources=resources)) + # policies def policy_list(self, ctxt, limit=None, marker=None, sort_keys=None, sort_dir=None, filters=None, show_deleted=False): return self.call(ctxt, self.make_msg('policy_list', limit=limit, @@ -197,3 +203,8 @@ class EngineClient(object): return self.call(ctxt, self.make_msg('policy_add_rules', policy_id=policy_id, rules=rules)) + + def settle_account(self, ctxt, user_id, task=None): + return self.call(ctxt, self.make_msg('settle_account', + user_id=user_id, + task=task)) diff --git a/bilean/rules/base.py b/bilean/rules/base.py index 7ab270f..83a719e 100644 --- a/bilean/rules/base.py +++ b/bilean/rules/base.py @@ -80,14 +80,14 @@ class Rule(object): self.name = name self.spec = spec - self.id = kwargs.get('id', None) + self.id = kwargs.get('id') self.type = kwargs.get('type', '%s-%s' % (type_name, version)) self.metadata = kwargs.get('metadata', {}) - self.created_at = kwargs.get('created_at', None) - self.updated_at = kwargs.get('updated_at', None) - self.deleted_at = kwargs.get('deleted_at', None) + self.created_at = kwargs.get('created_at') + self.updated_at = kwargs.get('updated_at') + self.deleted_at = kwargs.get('deleted_at') self.spec_data = schema.Spec(self.spec_schema, self.spec) self.properties = schema.Spec(self.properties_schema, diff --git a/bilean/rules/os/nova/server.py b/bilean/rules/os/nova/server.py index 9af3a27..f6fdb6b 100644 --- a/bilean/rules/os/nova/server.py +++ b/bilean/rules/os/nova/server.py @@ -73,7 +73,7 @@ class ServerRule(base.Rule): :param: resource: Resource object to find price. ''' - flavor = resource.properties.get('flavor', None) + flavor = resource.properties.get('flavor') if not flavor: raise exception.Error(msg='Flavor should be provided to get ' 'the price of server.') diff --git a/bilean/scheduler/__init__.py b/bilean/scheduler/__init__.py new file mode 100644 index 0000000..14b55dd --- /dev/null +++ b/bilean/scheduler/__init__.py @@ -0,0 +1,51 @@ +# +# Copyright 2012, Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from bilean.common import consts +from bilean.common import messaging + +from oslo_context import context as oslo_context +import oslo_messaging + + +supported_actions = ( + UPDATE_JOBS, DELETE_JOBS, +) = ( + 'update_jobs', 'delete_jobs', +) + + +def notify(method, scheduler_id=None, **kwargs): + '''Send notification to scheduler + + :param method: remote method to call + :param scheduler_id: specify scheduler to notify; None implies broadcast + ''' + if scheduler_id: + # Notify specific scheduler identified by scheduler_id + client = messaging.get_rpc_client( + topic=consts.SCHEDULER_TOPIC, + server=scheduler_id, + version=consts.RPC_API_VERSION) + else: + # Broadcast to all schedulers + client = messaging.get_rpc_client( + topic=consts.SCHEDULER_TOPIC, + version=consts.RPC_API_VERSION) + try: + client.call(oslo_context.get_current(), method, **kwargs) + return True + except oslo_messaging.MessagingTimeout: + return False diff --git a/bilean/engine/scheduler.py b/bilean/scheduler/cron_scheduler.py similarity index 71% rename from bilean/engine/scheduler.py rename to bilean/scheduler/cron_scheduler.py index f055b8d..bef0e7b 100644 --- a/bilean/engine/scheduler.py +++ b/bilean/scheduler/cron_scheduler.py @@ -14,13 +14,11 @@ from bilean.common import context as bilean_context from bilean.common import exception from bilean.common.i18n import _ -from bilean.common.i18n import _LE from bilean.common.i18n import _LI from bilean.common import utils from bilean.db import api as db_api -from bilean.engine import lock as bilean_lock from bilean.engine import user as user_mod -from bilean import notifier +from bilean.rpc import client as rpc_client from oslo_config import cfg from oslo_log import log as logging @@ -61,8 +59,8 @@ cfg.CONF.register_opts(scheduler_opts, group=scheduler_group) LOG = logging.getLogger(__name__) -class BileanScheduler(object): - """Billing scheduler based on apscheduler""" +class CronScheduler(object): + """Cron scheduler based on apscheduler""" job_types = ( NOTIFY, DAILY, FREEZE, @@ -72,39 +70,50 @@ class BileanScheduler(object): trigger_types = (DATE, CRON) = ('date', 'cron') def __init__(self, **kwargs): - super(BileanScheduler, self).__init__() + super(CronScheduler, self).__init__() self._scheduler = BackgroundScheduler() - self.notifier = notifier.Notifier() - self.engine_id = kwargs.get('engine_id', None) + self.scheduler_id = kwargs.get('scheduler_id') + self.rpc_client = rpc_client.EngineClient() if cfg.CONF.scheduler.store_ap_job: self._scheduler.add_jobstore(cfg.CONF.scheduler.backend, url=cfg.CONF.scheduler.connection) + def start(self): + LOG.info(_('Starting Cron scheduler')) + self._scheduler.start() + + def stop(self): + LOG.info(_('Stopping Cron scheduler')) + self._scheduler.shutdown() + def init_scheduler(self): """Init all jobs related to the engine from db.""" admin_context = bilean_context.get_admin_context() jobs = [] or db_api.job_get_all(admin_context, - engine_id=self.engine_id) + scheduler_id=self.scheduler_id) for job in jobs: - if self.is_exist(job.id): + if self._is_exist(job.id): continue task_name = "_%s_task" % (job.job_type) task = getattr(self, task_name) - LOG.info(_LI("Add job '%(job_id)s' to engine '%(engine_id)s'."), - {'job_id': job.id, 'engine_id': self.engine_id}) + LOG.info(_LI("Add job '%(job_id)s' to scheduler '%(id)s'."), + {'job_id': job.id, 'id': self.scheduler_id}) tg_type = self.CRON if job.job_type == self.DAILY else self.DAILY - self.add_job(task, job.id, trigger_type=tg_type, - params=job.parameters) + self._add_job(task, job.id, trigger_type=tg_type, + params=job.parameters) + + LOG.info(_LI("Initialise users from keystone.")) + users = user_mod.User.init_users(admin_context) # Init daily job for all users - users = user_mod.User.load_all(admin_context) - for user in users: - job_id = self._generate_job_id(user.id, self.DAILY) - if self.is_exist(job_id): - continue - self._add_daily_job(user) + if users: + for user in users: + job_id = self._generate_job_id(user.id, self.DAILY) + if self._is_exist(job_id): + continue + self._add_daily_job(user) - def add_job(self, task, job_id, trigger_type='date', **kwargs): + def _add_job(self, task, job_id, trigger_type='date', **kwargs): """Add a job to scheduler by given data. :param str|unicode user_id: used as job_id @@ -128,8 +137,8 @@ class BileanScheduler(object): return # Add a cron type job - hour = kwargs.get('hour', None) - minute = kwargs.get('minute', None) + hour = kwargs.get('hour') + minute = kwargs.get('minute') if not hour or not minute: hour, minute = self._generate_timer() self._scheduler.add_job(task, 'cron', @@ -140,7 +149,7 @@ class BileanScheduler(object): id=job_id, misfire_grace_time=mg_time) - def modify_job(self, job_id, **changes): + def _modify_job(self, job_id, **changes): """Modifies the properties of a single job. Modifications are passed to this method as extra keyword arguments. @@ -150,7 +159,7 @@ class BileanScheduler(object): self._scheduler.modify_job(job_id, **changes) - def remove_job(self, job_id): + def _remove_job(self, job_id): """Removes a job, preventing it from being run any more. :param str|unicode job_id: the identifier of the job @@ -158,15 +167,7 @@ class BileanScheduler(object): self._scheduler.remove_job(job_id) - def start(self): - LOG.info(_('Starting Billing scheduler')) - self._scheduler.start() - - def stop(self): - LOG.info(_('Stopping Billing scheduler')) - self._scheduler.shutdown() - - def is_exist(self, job_id): + def _is_exist(self, job_id): """Returns if the Job exists that matches the given ``job_id``. :param str|unicode job_id: the identifier of the job @@ -177,65 +178,36 @@ class BileanScheduler(object): return job is not None def _notify_task(self, user_id): - res = bilean_lock.user_lock_acquire(user_id, self.engine_id) - if not res: - LOG.error(_LE('Failed grabbing the lock for user %s'), user_id) - return - admin_context = bilean_context.get_admin_context() - user = user_mod.User.load(admin_context, user_id=user_id) - if user.notify_or_not(): - user.do_bill(admin_context) - reason = "The balance is almost use up" - user.set_status(admin_context, user.WARNING, reason) - msg = {'user': user_id, 'notification': reason} - self.notifier.info('billing.notify', msg) + user = self.rpc_client.settle_account( + admin_context, user_id, task=self.NOTIFY) + user_obj = user_mod.User.from_dict(user) try: db_api.job_delete( - admin_context, self._generate_job_id(user.id, 'notify')) + admin_context, self._generate_job_id(user_id, self.NOTIFY)) except exception.NotFound as e: LOG.warn(_("Failed in deleting job: %s") % six.text_type(e)) - self.update_user_job(user) - bilean_lock.user_lock_release(user_id, engine_id=self.engine_id) + self.update_jobs(user_obj) def _daily_task(self, user_id): - res = bilean_lock.user_lock_acquire(user_id, self.engine_id) - if not res: - LOG.error(_LE('Failed grabbing the lock for user %s'), user_id) - return - admin_context = bilean_context.get_admin_context() - user = user_mod.User.load(admin_context, user_id=user_id) - user.do_bill(admin_context) - - try: - db_api.job_delete( - admin_context, self._generate_job_id(user.id, 'daily')) - except exception.NotFound as e: - LOG.warn(_("Failed in deleting job: %s") % six.text_type(e)) - self.update_user_job(user) - - bilean_lock.user_lock_release(user_id, engine_id=self.engine_id) + user = self.rpc_client.settle_account( + admin_context, user_id, task=self.DAILY) + user_obj = user_mod.User.from_dict(user) + self.update_jobs(user_obj) def _freeze_task(self, user_id): - res = bilean_lock.user_lock_acquire(user_id, self.engine_id) - if not res: - LOG.error(_LE('Failed grabbing the lock for user %s'), user_id) - return - admin_context = bilean_context.get_admin_context() - user = user_mod.User.load(admin_context, user_id=user_id) - user.do_bill(admin_context) - + user = self.rpc_client.settle_account( + admin_context, user_id, task=self.FREEZE) + user_obj = user_mod.User.from_dict(user) try: db_api.job_delete( - admin_context, self._generate_job_id(user.id, 'freeze')) + admin_context, self._generate_job_id(user_id, self.FREEZE)) except exception.NotFound as e: LOG.warn(_("Failed in deleting job: %s") % six.text_type(e)) - self.update_user_job(user) - - bilean_lock.user_lock_release(user_id, engine_id=self.engine_id) + self.update_jobs(user_obj) def _add_notify_job(self, user): if not user.rate: @@ -247,11 +219,11 @@ class BileanScheduler(object): run_date = timeutils.utcnow() + timedelta(seconds=notify_seconds) job_params = {'run_date': run_date} job_id = self._generate_job_id(user.id, self.NOTIFY) - self.add_job(self._notify_task, job_id, **job_params) + self._add_job(self._notify_task, job_id, **job_params) # Save job to database job = {'id': job_id, 'job_type': self.NOTIFY, - 'engine_id': self.engine_id, + 'scheduler_id': self.scheduler_id, 'parameters': {'run_date': utils.format_time(run_date)}} admin_context = bilean_context.get_admin_context() db_api.job_create(admin_context, job) @@ -263,11 +235,11 @@ class BileanScheduler(object): run_date = timeutils.utcnow() + timedelta(seconds=total_seconds) job_params = {'run_date': run_date} job_id = self._generate_job_id(user.id, self.FREEZE) - self.add_job(self._freeze_task, job_id, **job_params) + self._add_job(self._freeze_task, job_id, **job_params) # Save job to database job = {'id': job_id, 'job_type': self.FREEZE, - 'engine_id': self.engine_id, + 'scheduler_id': self.scheduler_id, 'parameters': {'run_date': utils.format_time(run_date)}} admin_context = bilean_context.get_admin_context() db_api.job_create(admin_context, job) @@ -277,47 +249,17 @@ class BileanScheduler(object): job_id = self._generate_job_id(user.id, self.DAILY) job_params = {'hour': random.randint(0, 23), 'minute': random.randint(0, 59)} - self.add_job(self._daily_task, job_id, trigger_type='cron', - **job_params) + self._add_job(self._daily_task, job_id, + trigger_type='cron', **job_params) # Save job to database job = {'id': job_id, 'job_type': self.DAILY, - 'engine_id': self.engine_id, + 'scheduler_id': self.scheduler_id, 'parameters': job_params} admin_context = bilean_context.get_admin_context() db_api.job_create(admin_context, job) return True - def update_user_job(self, user): - """Update user's billing job""" - # Delete all jobs except daily job - admin_context = bilean_context.get_admin_context() - for job_type in self.NOTIFY, self.FREEZE: - job_id = self._generate_job_id(user.id, job_type) - try: - if self.is_exist(job_id): - self.remove_job(job_id) - db_api.job_delete(admin_context, job_id) - except Exception as e: - LOG.warn(_("Failed in deleting job: %s") % six.text_type(e)) - - if user.status == user.ACTIVE: - self._add_notify_job(user) - elif user.status == user.WARNING: - self._add_freeze_job(user) - - def delete_user_jobs(self, user): - """Delete all jobs related the specific user.""" - admin_context = bilean_context.get_admin_context() - for job_type in self.job_types: - job_id = self._generate_job_id(user.id, job_type) - try: - if self.is_exist(job_id): - self.remove_job(job_id) - db_api.job_delete(admin_context, job_id) - except Exception as e: - LOG.warn(_("Failed in deleting job: %s") % six.text_type(e)) - def _generate_timer(self): """Generate a random timer include hour and minute.""" hour = random.randint(0, 23) @@ -328,6 +270,36 @@ class BileanScheduler(object): """Generate job id by given user_id and job type""" return "%s-%s" % (job_type, user_id) + def update_jobs(self, user): + """Update user's billing job""" + # Delete all jobs except daily job + admin_context = bilean_context.get_admin_context() + for job_type in self.NOTIFY, self.FREEZE: + job_id = self._generate_job_id(user.id, job_type) + try: + if self._is_exist(job_id): + self._remove_job(job_id) + db_api.job_delete(admin_context, job_id) + except Exception as e: + LOG.warn(_("Failed in deleting job: %s") % six.text_type(e)) + + if user.status == user.ACTIVE: + self._add_notify_job(user) + elif user.status == user.WARNING: + self._add_freeze_job(user) + + def delete_jobs(self, user): + """Delete all jobs related the specific user.""" + admin_context = bilean_context.get_admin_context() + for job_type in self.job_types: + job_id = self._generate_job_id(user.id, job_type) + try: + if self._is_exist(job_id): + self._remove_job(job_id) + db_api.job_delete(admin_context, job_id) + except Exception as e: + LOG.warn(_("Failed in deleting job: %s") % six.text_type(e)) + def list_opts(): yield scheduler_group.name, scheduler_opts diff --git a/bilean/scheduler/service.py b/bilean/scheduler/service.py new file mode 100644 index 0000000..a92c623 --- /dev/null +++ b/bilean/scheduler/service.py @@ -0,0 +1,86 @@ +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import six +import socket + +from oslo_log import log as logging +import oslo_messaging +from oslo_service import service + +from bilean.common import consts +from bilean.common.i18n import _LE +from bilean.common.i18n import _LI +from bilean.common import messaging as rpc_messaging +from bilean.engine import user as user_mod +from bilean.scheduler import cron_scheduler + +LOG = logging.getLogger(__name__) + + +class SchedulerService(service.Service): + + def __init__(self, host, topic, manager=None, context=None): + super(SchedulerService, self).__init__() + self.host = host + self.topic = topic + + self.scheduler_id = None + self.scheduler = None + self.target = None + self._rpc_server = None + + def start(self): + self.scheduler_id = socket.gethostname() + + self.scheduler = cron_scheduler.CronScheduler( + scheduler_id=self.scheduler_id) + LOG.info(_LI("Starting billing scheduler")) + self.scheduler.init_scheduler() + self.scheduler.start() + + LOG.info(_LI("Starting rpc server for bilean scheduler service")) + self.target = oslo_messaging.Target(version=consts.RPC_API_VERSION, + server=self.scheduler_id, + topic=self.topic) + self._rpc_server = rpc_messaging.get_rpc_server(self.target, self) + self._rpc_server.start() + + super(SchedulerService, self).start() + + def _stop_rpc_server(self): + # Stop RPC connection to prevent new requests + LOG.info(_LI("Stopping scheduler service...")) + try: + self._rpc_server.stop() + self._rpc_server.wait() + LOG.info(_LI('Scheduler service stopped successfully')) + except Exception as ex: + LOG.error(_LE('Failed to stop scheduler service: %s'), + six.text_type(ex)) + + def stop(self): + self._stop_rpc_server() + + LOG.info(_LI("Stopping billing scheduler")) + self.scheduler.stop() + + super(SchedulerService, self).stop() + + def update_jobs(self, ctxt, user): + user_obj = user_mod.User.from_dict(user) + self.scheduler.update_jobs(user_obj) + + def delete_jobs(self, ctxt, user): + user_obj = user_mod.User.from_dict(user) + self.scheduler.delete_jobs(user_obj) diff --git a/bin/bilean-manage b/bin/bilean-manage deleted file mode 100755 index 9cb9c93..0000000 --- a/bin/bilean-manage +++ /dev/null @@ -1,27 +0,0 @@ -#!/usr/bin/env python -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or -# implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import os -import sys - -POSSIBLE_TOPDIR = os.path.normpath(os.path.join(os.path.abspath(sys.argv[0]), - os.pardir, - os.pardir)) -if os.path.exists(os.path.join(POSSIBLE_TOPDIR, 'bilean', '__init__.py')): - sys.path.insert(0, POSSIBLE_TOPDIR) - -from bilean.cmd import manage - -manage.main() diff --git a/setup.cfg b/setup.cfg index b663adf..66a3c1f 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,11 +1,10 @@ [metadata] name = bilean -version = 2015.2 summary = OpenStack Billing Service description-file = README.rst author = OpenStack -author-email = dongbing.lv@kylin-cloud.com +author-email = openstack-dev@lists.openstack.org home-page = http://www.openstack.org/ classifier = Environment :: OpenStack @@ -16,23 +15,26 @@ classifier = Programming Language :: Python Programming Language :: Python :: 2 Programming Language :: Python :: 2.7 - Programming Language :: Python :: 2.6 + Programming Language :: Python :: 3 + Programming Language :: Python :: 3.4 [files] packages = bilean -scripts = - bin/bilean-api - bin/bilean-engine - bin/bilean-notification - bin/bilean-manage [entry_points] +console_scripts = + bilean-api = bilean.cmd.api:main + bilean-engine = bilean.cmd.engine:main + bilean-scheduler = bilean.cmd.scheduler:main + bilean-notification = bilean.cmd.notification:main + bilean-manage = bilean.cmd.manage:main + oslo.config.opts = bilean.common.config = bilean.common.config:list_opts bilean.common.wsgi = bilean.common.wsgi:list_opts bilean.api.middleware.ssl = bilean.api.middleware.ssl:list_opts - bilean.engine.scheduler = bilean.engine.scheduler:list_opts + bilean.scheduler.cron_scheduler = bilean.scheduler.cron_scheduler:list_opts bilean.notification.converter = bilean.notification.converter:list_opts bilean.drivers = diff --git a/tox.ini b/tox.ini index 8cefa89..1a9781e 100644 --- a/tox.ini +++ b/tox.ini @@ -34,7 +34,7 @@ commands = {posargs} commands = python setup.py build_sphinx [flake8] -ignore = E731 +ignore = E731, E402 exclude = .venv,.git,.tox,cover,dist,*lib/python*,*egg,tools,build max-complexity=20