diff --git a/quark/api/extensions/jobs.py b/quark/api/extensions/jobs.py new file mode 100644 index 0000000..eb8a160 --- /dev/null +++ b/quark/api/extensions/jobs.py @@ -0,0 +1,126 @@ +# Copyright (c) 2016 Rackspace Hosting Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from neutron.api import extensions +from neutron import manager +from neutron import wsgi +from neutron_lib import exceptions as n_exc +from oslo_log import log as logging +import webob + +RESOURCE_NAME = 'job' +RESOURCE_COLLECTION = RESOURCE_NAME + "s" +EXTENDED_ATTRIBUTES_2_0 = { + RESOURCE_COLLECTION: { + "completed": {"allow_post": False, "is_visible": True, + "default": False}} +} + +attr_dict = EXTENDED_ATTRIBUTES_2_0[RESOURCE_COLLECTION] +attr_dict[RESOURCE_NAME] = {'allow_post': True, + 'allow_put': True, + 'is_visible': True} + +LOG = logging.getLogger(__name__) + + +class JobsController(wsgi.Controller): + + def __init__(self, plugin): + self._resource_name = RESOURCE_NAME + self._plugin = plugin + + def index(self, request): + context = request.context + return {"jobs": self._plugin.get_jobs(context, **request.GET)} + + def show(self, request, id): + context = request.context + try: + return {"job": self._plugin.get_job(context, id)} + except n_exc.NotFound as e: + raise webob.exc.HTTPNotFound(e) + + def create(self, request, body=None): + context = request.context + body = self._deserialize(request.body, request.get_content_type()) + try: + return {"job": self._plugin.create_job(context, body)} + except n_exc.NotFound as e: + raise webob.exc.HTTPNotFound(e) + except n_exc.Conflict as e: + raise webob.exc.HTTPConflict(e) + except n_exc.BadRequest as e: + raise webob.exc.HTTPBadRequest(e) + + def update(self, request, id, body=None): + context = request.context + body = self._deserialize(request.body, request.get_content_type()) + try: + return {"job": self._plugin.update_job(context, id, body)} + except n_exc.NotFound as e: + raise webob.exc.HTTPNotFound(e) + except n_exc.BadRequest as e: + raise webob.exc.HTTPBadRequest(e) + + def delete(self, request, id): + context = request.context + try: + return self._plugin.delete_job(context, id) + except n_exc.NotFound as e: + raise webob.exc.HTTPNotFound(e) + except n_exc.BadRequest as e: + raise webob.exc.HTTPBadRequest(e) + + +class Jobs(extensions.ExtensionDescriptor): + """Jobs support.""" + @classmethod + def get_name(cls): + return "Asyncronous jobs for a tenant" + + @classmethod + def get_alias(cls): + return RESOURCE_COLLECTION + + @classmethod + def get_description(cls): + return "Provide a way to track asyncronous jobs" + + @classmethod + def get_namespace(cls): + return ("http://docs.openstack.org/network/ext/" + "ip_addresses/api/v2.0") + + @classmethod + def get_updated(cls): + return "2016-05-15T10:00:00-00:00" + + def get_extended_resources(self, version): + if version == "2.0": + return EXTENDED_ATTRIBUTES_2_0 + else: + return {} + + @classmethod + def get_resources(cls): + """Returns Ext Resources.""" + job_controller = JobsController( + manager.NeutronManager.get_plugin()) + resources = [] + resources.append(extensions.ResourceExtension( + Jobs.get_alias(), + job_controller)) + return resources diff --git a/quark/db/api.py b/quark/db/api.py index c80912e..48c5708 100644 --- a/quark/db/api.py +++ b/quark/db/api.py @@ -92,7 +92,7 @@ def _model_query(context, model, filters, fields=None): model_filters = [] eq_filters = ["address", "cidr", "deallocated", "ip_version", "service", "mac_address_range_id", "transaction_id", "lock_id", - "address_type"] + "address_type", "completed"] in_filters = ["device_id", "device_owner", "group_id", "id", "mac_address", "name", "network_id", "segment_id", "subnet_id", "used_by_tenant_id", "version"] @@ -1148,3 +1148,33 @@ def segment_allocation_range_create(context, **sa_range_dict): def segment_allocation_range_delete(context, sa_range): context.session.delete(sa_range) + + +@scoped +def async_transaction_find(context, lock_mode=False, **filters): + query = context.session.query(models.AsyncTransactions) + if lock_mode: + query = query.with_lockmode("update") + + model_filters = _model_query( + context, models.AsyncTransactions, filters) + + query = query.filter(*model_filters) + return query + + +def async_transaction_create(context, **transaction_dict): + tx = models.AsyncTransactions() + tx.update(transaction_dict) + context.session.add(tx) + return tx + + +def async_transaction_update(context, transaction, **kwargs): + transaction.update(kwargs) + context.session.add(transaction) + return transaction + + +def async_transaction_delete(context, transaction): + context.session.delete(transaction) diff --git a/quark/db/migration/alembic/versions/271cce54e15b_add_async_transactions_table.py b/quark/db/migration/alembic/versions/271cce54e15b_add_async_transactions_table.py new file mode 100644 index 0000000..02a632a --- /dev/null +++ b/quark/db/migration/alembic/versions/271cce54e15b_add_async_transactions_table.py @@ -0,0 +1,32 @@ +"""Add async transactions table + +Revision ID: 271cce54e15b +Revises: 2a116b962c95 +Create Date: 2016-06-15 09:24:29.941684 + +""" + +# revision identifiers, used by Alembic. +revision = '271cce54e15b' +down_revision = '2a116b962c95' + +from alembic import op +import sqlalchemy as sa + + +def upgrade(): + op.create_table('quark_async_transactions', + sa.Column('created_at', sa.DateTime(), nullable=True), + sa.Column('id', sa.String(36), nullable=False), + sa.Column('tenant_id', sa.String(255), nullable=False), + sa.Column('action', sa.String(255), nullable=False), + sa.Column('completed', sa.Boolean()), + sa.PrimaryKeyConstraint('id'), + mysql_engine='InnoDB') + op.create_index(op.f('ix_quark_async_transactions_tenant_id'), + 'quark_async_transactions', ['tenant_id'], + unique=False) + + +def downgrade(): + op.drop_table('quark_async_transactions') diff --git a/quark/db/models.py b/quark/db/models.py index 0e75363..2575492 100644 --- a/quark/db/models.py +++ b/quark/db/models.py @@ -589,3 +589,10 @@ class SegmentAllocationRange(BASEV2, models.HasId): last_id = sa.Column(sa.BigInteger(), nullable=False) do_not_use = sa.Column(sa.Boolean(), default=False, nullable=False) + + +class AsyncTransactions(BASEV2, models.HasId): + __tablename__ = "quark_async_transactions" + tenant_id = sa.Column(sa.String(255), index=True) + action = sa.Column(sa.String(255)) + completed = sa.Column(sa.Boolean(), default=False) diff --git a/quark/exceptions.py b/quark/exceptions.py index 150179e..8e3da4e 100644 --- a/quark/exceptions.py +++ b/quark/exceptions.py @@ -244,3 +244,7 @@ class CannotAddMoreIPsToPort(n_exc.OverQuota): class CannotCreateMoreSharedIPs(n_exc.OverQuota): message = _("Cannot create more shared IPs on selected network") + + +class JobNotFound(n_exc.NotFound): + message = _("Job %(job_id)s not found") diff --git a/quark/plugin.py b/quark/plugin.py index 3ac3095..d32dff1 100644 --- a/quark/plugin.py +++ b/quark/plugin.py @@ -30,6 +30,7 @@ from quark import ip_availability from quark.plugin_modules import floating_ips from quark.plugin_modules import ip_addresses from quark.plugin_modules import ip_policies +from quark.plugin_modules import jobs from quark.plugin_modules import mac_address_ranges from quark.plugin_modules import networks from quark.plugin_modules import ports @@ -131,7 +132,7 @@ class Plugin(neutron_plugin_base_v2.NeutronPluginBaseV2, "networks_quark", "router", "ip_availabilities", "ports_quark", "floatingip", "segment_allocation_ranges", - "scalingip"] + "scalingip", "jobs"] def __init__(self): LOG.info("Starting quark plugin") @@ -499,3 +500,25 @@ class Plugin(neutron_plugin_base_v2.NeutronPluginBaseV2, fields=fields, sorts=sorts, limit=limit, marker=marker, page_reverse=page_reverse) + + @sessioned + def get_jobs(self, context, **filters): + return jobs.get_jobs(context, **filters) + + @sessioned + def get_job(self, context, id): + return jobs.get_job(context, id) + + @sessioned + def create_job(self, context, job): + self._fix_missing_tenant_id(context, job['job']) + return jobs.create_job(context, job) + + @sessioned + def update_job(self, context, id, job): + self._fix_missing_tenant_id(context, job['job']) + return jobs.update_job(context, id, job) + + @sessioned + def delete_job(self, context, id): + return jobs.delete_job(context, id) diff --git a/quark/plugin_modules/jobs.py b/quark/plugin_modules/jobs.py new file mode 100644 index 0000000..8fe6db7 --- /dev/null +++ b/quark/plugin_modules/jobs.py @@ -0,0 +1,89 @@ +# Copyright 2016 Rackspace Hosting Inc. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from neutron_lib import exceptions as n_exc +from oslo_config import cfg +from oslo_log import log as logging + +from quark.db import api as db_api +from quark import exceptions as q_exc +from quark import plugin_views as v + +CONF = cfg.CONF +LOG = logging.getLogger(__name__) + + +def get_jobs(context, **filters): + LOG.info("get_jobs for tenant %s" % context.tenant_id) + if not filters: + filters = {} + jobs = db_api.async_transaction_find(context, scope=db_api.ALL, **filters) + return [v._make_job_dict(ip) for ip in jobs] + + +def get_job(context, id): + LOG.info("get_job %s for tenant %s" % (id, context.tenant_id)) + filters = {} + job = db_api.async_transaction_find(context, id=id, scope=db_api.ONE, + **filters) + if not job: + raise q_exc.JobNotFound(job_id=id) + return v._make_job_dict(job) + + +def create_job(context, body): + LOG.info("create_job for tenant %s" % context.tenant_id) + + if not context.is_admin: + raise n_exc.NotAuthorized() + job = body.get('job') + if not job: + raise n_exc.BadRequest(resource="job", msg="Invalid request body.") + with context.session.begin(): + new_job = db_api.async_transaction_create(context, **job) + return v._make_job_dict(new_job) + + +def update_job(context, id, body): + LOG.info("update_job %s for tenant %s" % (id, context.tenant_id)) + + if not context.is_admin: + raise n_exc.NotAuthorized() + job_update = body.get('job') + if not job_update: + raise n_exc.BadRequest(resource="job", msg="Invalid request body.") + job = db_api.async_transaction_find(context, id=id, scope=db_api.ONE) + if not job: + raise q_exc.JobNotFound(job_id=id) + job_mod = db_api.async_transaction_update(context, job, **job_update) + return v._make_job_dict(job_mod) + + +def delete_job(context, id, **filters): + """Delete an ip address. + + : param context: neutron api request context + : param id: UUID representing the ip address to delete. + """ + LOG.info("delete_ip_address %s for tenant %s" % (id, context.tenant_id)) + + if not context.is_admin: + raise n_exc.NotAuthorized() + with context.session.begin(): + job = db_api.async_transaction_find(context, id=id, scope=db_api.ONE, + **filters) + if not job: + raise q_exc.JobNotFound(job_id=id) + db_api.async_transaction_delete(context, job) diff --git a/quark/plugin_views.py b/quark/plugin_views.py index cbebfac..d98137b 100644 --- a/quark/plugin_views.py +++ b/quark/plugin_views.py @@ -355,3 +355,11 @@ def _make_scaling_ip_dict(flip): "tenant_id": flip.get("used_by_tenant_id"), "status": flip.get("status"), "ports": ports} + + +def _make_job_dict(job): + return {"id": job.get('id'), + "action": job.get('action'), + "completed": job.get('completed'), + "tenant_id": job.get('tenant_id'), + "created_at": job.get('created_at')} diff --git a/quark/tests/functional/plugin_modules/test_jobs.py b/quark/tests/functional/plugin_modules/test_jobs.py new file mode 100644 index 0000000..09936d8 --- /dev/null +++ b/quark/tests/functional/plugin_modules/test_jobs.py @@ -0,0 +1,207 @@ +# Copyright 2016 Rackspace Hosting Inc +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for# the specific language governing permissions and limitations +# under the License. +from oslo_log import log as logging + +from neutron_lib import exceptions as n_exc + +from quark import exceptions as q_exc +from quark.plugin_modules import jobs as job_api +from quark.tests.functional.base import BaseFunctionalTest + +LOG = logging.getLogger(__name__) + + +class QuarkJobs(BaseFunctionalTest): + def setUp(self): + super(QuarkJobs, self).setUp() + self.action = "test action" + self.tenant_id = "test_tenant" + self.tenant_id2 = "test_tenant2" + + def test_create_job(self): + job_body = dict(tenant_id=self.tenant_id, action=self.action, + completed=False) + job_body = dict(job=job_body) + job = job_api.create_job(self.admin_context, job_body) + self.assertIsNotNone(job) + self.assertFalse(job['completed']) + self.assertEqual(self.tenant_id, job['tenant_id']) + self.assertEqual(self.action, job['action']) + + def test_create_job_fail_non_admin(self): + job_body = dict(tenant_id=self.tenant_id, action=self.action, + completed=False) + job_body = dict(job=job_body) + with self.assertRaises(n_exc.NotAuthorized): + job_api.create_job(self.context, job_body) + + def test_get_jobs(self): + job_body = dict(tenant_id=self.tenant_id, action=self.action, + completed=False) + job_body = dict(job=job_body) + job1 = job_api.create_job(self.admin_context, job_body) + self.assertIsNotNone(job1) + + job_body = dict(tenant_id=self.tenant_id2, action=self.action, + completed=True) + job_body = dict(job=job_body) + job2 = job_api.create_job(self.admin_context, job_body) + self.assertIsNotNone(job2) + + jobs = job_api.get_job(self.admin_context, job1['id']) + self.assertFalse(type(jobs) in [list, tuple]) + job = jobs + self.assertFalse(job['completed']) + self.assertEqual(self.tenant_id, job['tenant_id']) + self.assertEqual(self.action, job['action']) + + job = job_api.get_job(self.admin_context, job2['id']) + self.assertTrue(job['completed']) + self.assertEqual(self.tenant_id2, job['tenant_id']) + self.assertEqual(self.action, job['action']) + + with self.assertRaises(q_exc.JobNotFound): + job_api.get_job(self.admin_context, 'derp') + + jobs = job_api.get_jobs(self.admin_context) + self.assertTrue(type(jobs) in [list, tuple]) + self.assertEqual(2, len(jobs)) + + jobs = job_api.get_jobs(self.admin_context, completed=True) + self.assertTrue(type(jobs) in [list, tuple]) + self.assertEqual(1, len(jobs)) + + jobs = job_api.get_jobs(self.admin_context, completed=False) + self.assertTrue(type(jobs) in [list, tuple]) + self.assertEqual(1, len(jobs)) + + jobs = job_api.get_jobs(self.admin_context, completed='hello') + self.assertTrue(type(jobs) in [list, tuple]) + self.assertEqual(0, len(jobs)) + + jobs = job_api.get_jobs(self.admin_context, tenant_id=self.tenant_id) + self.assertTrue(type(jobs) in [list, tuple]) + self.assertEqual(1, len(jobs)) + + jobs = job_api.get_jobs(self.admin_context, tenant_id='derp') + self.assertTrue(type(jobs) in [list, tuple]) + self.assertEqual(0, len(jobs)) + + def test_get_job_different_non_admin(self): + job_body = dict(tenant_id=self.context.tenant_id, action=self.action, + completed=False) + job_body = dict(job=job_body) + job1 = job_api.create_job(self.admin_context, job_body) + self.assertIsNotNone(job1) + + job_body = dict(tenant_id=self.tenant_id2, action=self.action, + completed=True) + job_body = dict(job=job_body) + job2 = job_api.create_job(self.admin_context, job_body) + self.assertIsNotNone(job2) + + jobs = job_api.get_jobs(self.context) + self.assertTrue(type(jobs) in [list, tuple]) + + self.assertEqual(1, len(jobs)) + self.assertEqual(self.context.tenant_id, jobs[0]['tenant_id']) + + def test_update_jobs(self): + update_body = dict(completed=True) + update_body = dict(job=update_body) + + with self.assertRaises(q_exc.JobNotFound): + job_api.update_job(self.admin_context, 'derp', update_body) + + job_body = dict(tenant_id=self.tenant_id, action=self.action, + completed=False) + job_body = dict(job=job_body) + job1 = job_api.create_job(self.admin_context, job_body) + self.assertIsNotNone(job1) + + job = job_api.get_job(self.admin_context, job1['id']) + self.assertFalse(job['completed']) + + updated_job = job_api.update_job(self.admin_context, job1['id'], + update_body) + self.assertTrue(updated_job['completed']) + + job = job_api.get_job(self.admin_context, job1['id']) + self.assertTrue(job['completed']) + + def test_update_job_fail_non_admin(self): + update_body = dict(completed=True) + update_body = dict(job=update_body) + + job_body = dict(tenant_id=self.tenant_id, action=self.action, + completed=False) + job_body = dict(job=job_body) + job1 = job_api.create_job(self.admin_context, job_body) + self.assertIsNotNone(job1) + + job = job_api.get_job(self.admin_context, job1['id']) + self.assertFalse(job['completed']) + + with self.assertRaises(n_exc.NotAuthorized): + job_api.update_job(self.context, job1['id'], update_body) + + updated_job = job_api.update_job(self.admin_context, job1['id'], + update_body) + self.assertTrue(updated_job['completed']) + + job = job_api.get_job(self.admin_context, job1['id']) + self.assertTrue(job['completed']) + + def test_delete_jobs(self): + with self.assertRaises(q_exc.JobNotFound): + job_api.delete_job(self.admin_context, 'derp') + + job_body = dict(tenant_id=self.tenant_id, action=self.action, + completed=False) + job_body = dict(job=job_body) + job1 = job_api.create_job(self.admin_context, job_body) + self.assertIsNotNone(job1) + + job = job_api.get_job(self.admin_context, job1['id']) + self.assertFalse(job['completed']) + + job_api.delete_job(self.admin_context, job1['id']) + + with self.assertRaises(q_exc.JobNotFound): + job_api.get_job(self.admin_context, job1['id']) + + with self.assertRaises(q_exc.JobNotFound): + job_api.delete_job(self.admin_context, job1['id']) + + def test_delete_job_fail_non_admin(self): + with self.assertRaises(n_exc.NotAuthorized): + job_api.delete_job(self.context, 'derp') + + job_body = dict(tenant_id=self.tenant_id, action=self.action, + completed=False) + job_body = dict(job=job_body) + job1 = job_api.create_job(self.admin_context, job_body) + self.assertIsNotNone(job1) + + job = job_api.get_job(self.admin_context, job1['id']) + self.assertFalse(job['completed']) + + with self.assertRaises(n_exc.NotAuthorized): + job_api.delete_job(self.context, job1['id']) + + job_api.delete_job(self.admin_context, job1['id']) + + with self.assertRaises(q_exc.JobNotFound): + job_api.get_job(self.context, job1['id']) diff --git a/quark/tools/async_worker.py b/quark/tools/async_worker.py new file mode 100644 index 0000000..1556be2 --- /dev/null +++ b/quark/tools/async_worker.py @@ -0,0 +1,149 @@ +import eventlet +eventlet.monkey_patch(socket=True, select=True, time=True) + +import sys +import time + +from oslo_config import cfg +from oslo_log import log as logging +import oslo_messaging as messaging +from oslo_service import service as common_service +from oslo_utils import excutils + +from neutron._i18n import _ +from neutron._i18n import _LE +from neutron.common import config +from neutron.common import rpc as n_rpc +from neutron.db import api as session +from neutron import service + +service_opts = [ + cfg.StrOpt('topic', + help=_('Topic for messaging to pub/sub to')), + cfg.StrOpt('transport_url', + help=_('Connection string for transport service')), + cfg.IntOpt('periodic_interval', + default=40, + help=_('Seconds between running periodic tasks')), + cfg.IntOpt('rpc_workers', + default=1, + help=_('Number of RPC worker processes for service')), + cfg.IntOpt('periodic_fuzzy_delay', + default=5, + help=_('Range of seconds to randomly delay when starting the ' + 'periodic task scheduler to reduce stampeding. ' + '(Disable by setting to 0)')), +] +CONF = cfg.CONF +CONF.register_opts(service_opts, "QUARK_ASYNC") +LOG = logging.getLogger(__name__) + + +class QuarkRpcTestCallback(object): + target = messaging.Target(version='1.0', namespace=None) + + def stuff(self, context, **kwargs): + return {"status": "okay"} + + +class QuarkAsyncPlugin(object): + def __init__(self): + pass + + def _setup_rpc(self): + self.endpoints = [QuarkRpcTestCallback()] + + def start_rpc_listeners(self): + """Configure all listeners here""" + self._setup_rpc() + self.conn = n_rpc.create_connection() + self.conn.create_consumer(CONF.QUARK_ASYNC.topic, self.endpoints, + fanout=False) + return self.conn.consume_in_threads() + + +def serve_rpc(): + + if cfg.CONF.QUARK_ASYNC.rpc_workers < 1: + cfg.CONF.set_override('rpc_workers', 1, "QUARK_ASYNC") + + try: + plugins = [QuarkAsyncPlugin()] + rpc = service.RpcWorker(plugins) + session.dispose() # probaby not needed, but maybe + launcher = common_service.ProcessLauncher(CONF, wait_interval=1.0) + launcher.launch_service(rpc, workers=CONF.QUARK_ASYNC.rpc_workers) + + return launcher + except Exception: + with excutils.save_and_reraise_exception(): + LOG.exception(_LE('Unrecoverable error: please check log for ' + 'details.')) + + +def start_api_and_rpc_workers(): + pool = eventlet.GreenPool() + + quark_rpc = serve_rpc() + pool.spawn(quark_rpc.wait) + + pool.waitall() + + +def boot_server(server_func): + # the configuration will be read into the cfg.CONF global data structure + config.init(sys.argv[1:]) + config.setup_logging() + config.set_config_defaults() + if not cfg.CONF.config_file: + sys.exit(_("ERROR: Unable to find configuration file via the default" + " search paths (~/.neutron/, ~/, /etc/neutron/, /etc/) and" + " the '--config-file' option!")) + try: + server_func() + except KeyboardInterrupt: + pass + except RuntimeError as e: + sys.exit(_("ERROR: %s") % e) + + +def main(): + boot_server(start_api_and_rpc_workers) + + +class QuarkRpcTestApi(object): + """This class is used for testing QuarkRpcTestCallback.""" + def __init__(self): + target = messaging.Target(topic=CONF.QUARK_ASYNC.topic) + self.client = n_rpc.get_client(target) + + def stuff(self, context): + cctxt = self.client.prepare(version='1.0') + return cctxt.call(context, 'stuff') + + +class QuarkAsyncTestContext(object): + """This class is used for testing QuarkRpcTestCallback.""" + def __init__(self): + self.time = time.ctime() + + def to_dict(self): + return {"application": "rpc-client", "time": time.ctime()} + + +def test_main(): + config.init(sys.argv[1:]) + config.setup_logging() + config.set_config_defaults() + if not cfg.CONF.config_file: + sys.exit(_("ERROR: Unable to find configuration file via the default" + " search paths (~/.neutron/, ~/, /etc/neutron/, /etc/) and" + " the '--config-file' option!")) + context = QuarkAsyncTestContext() # typically context is neutron context + client = QuarkRpcTestApi() + LOG.info(client.stuff(context)) + time.sleep(0) # necessary for preventing Timeout exceptions + + +if __name__ == "__main__": + main() diff --git a/setup.cfg b/setup.cfg index 8dc95c8..cd9b83e 100644 --- a/setup.cfg +++ b/setup.cfg @@ -21,6 +21,8 @@ console_scripts = quark-db-manage = quark.db.migration.alembic.cli:main gunicorn-neutron-server = quark.gunicorn_server:main quark-agent = quark.agent.agent:main + quark-async-worker = quark.tools.async_worker:main + quark-async-tester = quark.tools.async_worker:test_main ip_availability = quark.ip_availability:main redis_sg_tool = quark.tools.redis_sg_tool:main null_routes = quark.tools.null_routes:main