Merge "Initial work on async worker for quark"

This commit is contained in:
Jenkins 2016-06-23 01:05:26 +00:00 committed by Gerrit Code Review
commit 4557dfee13
11 changed files with 679 additions and 2 deletions

View File

@ -0,0 +1,126 @@
# Copyright (c) 2016 Rackspace Hosting Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from neutron.api import extensions
from neutron import manager
from neutron import wsgi
from neutron_lib import exceptions as n_exc
from oslo_log import log as logging
import webob
RESOURCE_NAME = 'job'
RESOURCE_COLLECTION = RESOURCE_NAME + "s"
EXTENDED_ATTRIBUTES_2_0 = {
RESOURCE_COLLECTION: {
"completed": {"allow_post": False, "is_visible": True,
"default": False}}
}
attr_dict = EXTENDED_ATTRIBUTES_2_0[RESOURCE_COLLECTION]
attr_dict[RESOURCE_NAME] = {'allow_post': True,
'allow_put': True,
'is_visible': True}
LOG = logging.getLogger(__name__)
class JobsController(wsgi.Controller):
def __init__(self, plugin):
self._resource_name = RESOURCE_NAME
self._plugin = plugin
def index(self, request):
context = request.context
return {"jobs": self._plugin.get_jobs(context, **request.GET)}
def show(self, request, id):
context = request.context
try:
return {"job": self._plugin.get_job(context, id)}
except n_exc.NotFound as e:
raise webob.exc.HTTPNotFound(e)
def create(self, request, body=None):
context = request.context
body = self._deserialize(request.body, request.get_content_type())
try:
return {"job": self._plugin.create_job(context, body)}
except n_exc.NotFound as e:
raise webob.exc.HTTPNotFound(e)
except n_exc.Conflict as e:
raise webob.exc.HTTPConflict(e)
except n_exc.BadRequest as e:
raise webob.exc.HTTPBadRequest(e)
def update(self, request, id, body=None):
context = request.context
body = self._deserialize(request.body, request.get_content_type())
try:
return {"job": self._plugin.update_job(context, id, body)}
except n_exc.NotFound as e:
raise webob.exc.HTTPNotFound(e)
except n_exc.BadRequest as e:
raise webob.exc.HTTPBadRequest(e)
def delete(self, request, id):
context = request.context
try:
return self._plugin.delete_job(context, id)
except n_exc.NotFound as e:
raise webob.exc.HTTPNotFound(e)
except n_exc.BadRequest as e:
raise webob.exc.HTTPBadRequest(e)
class Jobs(extensions.ExtensionDescriptor):
"""Jobs support."""
@classmethod
def get_name(cls):
return "Asyncronous jobs for a tenant"
@classmethod
def get_alias(cls):
return RESOURCE_COLLECTION
@classmethod
def get_description(cls):
return "Provide a way to track asyncronous jobs"
@classmethod
def get_namespace(cls):
return ("http://docs.openstack.org/network/ext/"
"ip_addresses/api/v2.0")
@classmethod
def get_updated(cls):
return "2016-05-15T10:00:00-00:00"
def get_extended_resources(self, version):
if version == "2.0":
return EXTENDED_ATTRIBUTES_2_0
else:
return {}
@classmethod
def get_resources(cls):
"""Returns Ext Resources."""
job_controller = JobsController(
manager.NeutronManager.get_plugin())
resources = []
resources.append(extensions.ResourceExtension(
Jobs.get_alias(),
job_controller))
return resources

View File

@ -92,7 +92,7 @@ def _model_query(context, model, filters, fields=None):
model_filters = []
eq_filters = ["address", "cidr", "deallocated", "ip_version", "service",
"mac_address_range_id", "transaction_id", "lock_id",
"address_type"]
"address_type", "completed"]
in_filters = ["device_id", "device_owner", "group_id", "id", "mac_address",
"name", "network_id", "segment_id", "subnet_id",
"used_by_tenant_id", "version"]
@ -1148,3 +1148,33 @@ def segment_allocation_range_create(context, **sa_range_dict):
def segment_allocation_range_delete(context, sa_range):
context.session.delete(sa_range)
@scoped
def async_transaction_find(context, lock_mode=False, **filters):
query = context.session.query(models.AsyncTransactions)
if lock_mode:
query = query.with_lockmode("update")
model_filters = _model_query(
context, models.AsyncTransactions, filters)
query = query.filter(*model_filters)
return query
def async_transaction_create(context, **transaction_dict):
tx = models.AsyncTransactions()
tx.update(transaction_dict)
context.session.add(tx)
return tx
def async_transaction_update(context, transaction, **kwargs):
transaction.update(kwargs)
context.session.add(transaction)
return transaction
def async_transaction_delete(context, transaction):
context.session.delete(transaction)

View File

@ -0,0 +1,32 @@
"""Add async transactions table
Revision ID: 271cce54e15b
Revises: 2a116b962c95
Create Date: 2016-06-15 09:24:29.941684
"""
# revision identifiers, used by Alembic.
revision = '271cce54e15b'
down_revision = '2a116b962c95'
from alembic import op
import sqlalchemy as sa
def upgrade():
op.create_table('quark_async_transactions',
sa.Column('created_at', sa.DateTime(), nullable=True),
sa.Column('id', sa.String(36), nullable=False),
sa.Column('tenant_id', sa.String(255), nullable=False),
sa.Column('action', sa.String(255), nullable=False),
sa.Column('completed', sa.Boolean()),
sa.PrimaryKeyConstraint('id'),
mysql_engine='InnoDB')
op.create_index(op.f('ix_quark_async_transactions_tenant_id'),
'quark_async_transactions', ['tenant_id'],
unique=False)
def downgrade():
op.drop_table('quark_async_transactions')

View File

@ -589,3 +589,10 @@ class SegmentAllocationRange(BASEV2, models.HasId):
last_id = sa.Column(sa.BigInteger(), nullable=False)
do_not_use = sa.Column(sa.Boolean(), default=False, nullable=False)
class AsyncTransactions(BASEV2, models.HasId):
__tablename__ = "quark_async_transactions"
tenant_id = sa.Column(sa.String(255), index=True)
action = sa.Column(sa.String(255))
completed = sa.Column(sa.Boolean(), default=False)

View File

@ -244,3 +244,7 @@ class CannotAddMoreIPsToPort(n_exc.OverQuota):
class CannotCreateMoreSharedIPs(n_exc.OverQuota):
message = _("Cannot create more shared IPs on selected network")
class JobNotFound(n_exc.NotFound):
message = _("Job %(job_id)s not found")

View File

@ -30,6 +30,7 @@ from quark import ip_availability
from quark.plugin_modules import floating_ips
from quark.plugin_modules import ip_addresses
from quark.plugin_modules import ip_policies
from quark.plugin_modules import jobs
from quark.plugin_modules import mac_address_ranges
from quark.plugin_modules import networks
from quark.plugin_modules import ports
@ -131,7 +132,7 @@ class Plugin(neutron_plugin_base_v2.NeutronPluginBaseV2,
"networks_quark", "router",
"ip_availabilities", "ports_quark",
"floatingip", "segment_allocation_ranges",
"scalingip"]
"scalingip", "jobs"]
def __init__(self):
LOG.info("Starting quark plugin")
@ -499,3 +500,25 @@ class Plugin(neutron_plugin_base_v2.NeutronPluginBaseV2,
fields=fields, sorts=sorts,
limit=limit, marker=marker,
page_reverse=page_reverse)
@sessioned
def get_jobs(self, context, **filters):
return jobs.get_jobs(context, **filters)
@sessioned
def get_job(self, context, id):
return jobs.get_job(context, id)
@sessioned
def create_job(self, context, job):
self._fix_missing_tenant_id(context, job['job'])
return jobs.create_job(context, job)
@sessioned
def update_job(self, context, id, job):
self._fix_missing_tenant_id(context, job['job'])
return jobs.update_job(context, id, job)
@sessioned
def delete_job(self, context, id):
return jobs.delete_job(context, id)

View File

@ -0,0 +1,89 @@
# Copyright 2016 Rackspace Hosting Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from neutron_lib import exceptions as n_exc
from oslo_config import cfg
from oslo_log import log as logging
from quark.db import api as db_api
from quark import exceptions as q_exc
from quark import plugin_views as v
CONF = cfg.CONF
LOG = logging.getLogger(__name__)
def get_jobs(context, **filters):
LOG.info("get_jobs for tenant %s" % context.tenant_id)
if not filters:
filters = {}
jobs = db_api.async_transaction_find(context, scope=db_api.ALL, **filters)
return [v._make_job_dict(ip) for ip in jobs]
def get_job(context, id):
LOG.info("get_job %s for tenant %s" % (id, context.tenant_id))
filters = {}
job = db_api.async_transaction_find(context, id=id, scope=db_api.ONE,
**filters)
if not job:
raise q_exc.JobNotFound(job_id=id)
return v._make_job_dict(job)
def create_job(context, body):
LOG.info("create_job for tenant %s" % context.tenant_id)
if not context.is_admin:
raise n_exc.NotAuthorized()
job = body.get('job')
if not job:
raise n_exc.BadRequest(resource="job", msg="Invalid request body.")
with context.session.begin():
new_job = db_api.async_transaction_create(context, **job)
return v._make_job_dict(new_job)
def update_job(context, id, body):
LOG.info("update_job %s for tenant %s" % (id, context.tenant_id))
if not context.is_admin:
raise n_exc.NotAuthorized()
job_update = body.get('job')
if not job_update:
raise n_exc.BadRequest(resource="job", msg="Invalid request body.")
job = db_api.async_transaction_find(context, id=id, scope=db_api.ONE)
if not job:
raise q_exc.JobNotFound(job_id=id)
job_mod = db_api.async_transaction_update(context, job, **job_update)
return v._make_job_dict(job_mod)
def delete_job(context, id, **filters):
"""Delete an ip address.
: param context: neutron api request context
: param id: UUID representing the ip address to delete.
"""
LOG.info("delete_ip_address %s for tenant %s" % (id, context.tenant_id))
if not context.is_admin:
raise n_exc.NotAuthorized()
with context.session.begin():
job = db_api.async_transaction_find(context, id=id, scope=db_api.ONE,
**filters)
if not job:
raise q_exc.JobNotFound(job_id=id)
db_api.async_transaction_delete(context, job)

View File

@ -355,3 +355,11 @@ def _make_scaling_ip_dict(flip):
"tenant_id": flip.get("used_by_tenant_id"),
"status": flip.get("status"),
"ports": ports}
def _make_job_dict(job):
return {"id": job.get('id'),
"action": job.get('action'),
"completed": job.get('completed'),
"tenant_id": job.get('tenant_id'),
"created_at": job.get('created_at')}

View File

@ -0,0 +1,207 @@
# Copyright 2016 Rackspace Hosting Inc
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for# the specific language governing permissions and limitations
# under the License.
from oslo_log import log as logging
from neutron_lib import exceptions as n_exc
from quark import exceptions as q_exc
from quark.plugin_modules import jobs as job_api
from quark.tests.functional.base import BaseFunctionalTest
LOG = logging.getLogger(__name__)
class QuarkJobs(BaseFunctionalTest):
def setUp(self):
super(QuarkJobs, self).setUp()
self.action = "test action"
self.tenant_id = "test_tenant"
self.tenant_id2 = "test_tenant2"
def test_create_job(self):
job_body = dict(tenant_id=self.tenant_id, action=self.action,
completed=False)
job_body = dict(job=job_body)
job = job_api.create_job(self.admin_context, job_body)
self.assertIsNotNone(job)
self.assertFalse(job['completed'])
self.assertEqual(self.tenant_id, job['tenant_id'])
self.assertEqual(self.action, job['action'])
def test_create_job_fail_non_admin(self):
job_body = dict(tenant_id=self.tenant_id, action=self.action,
completed=False)
job_body = dict(job=job_body)
with self.assertRaises(n_exc.NotAuthorized):
job_api.create_job(self.context, job_body)
def test_get_jobs(self):
job_body = dict(tenant_id=self.tenant_id, action=self.action,
completed=False)
job_body = dict(job=job_body)
job1 = job_api.create_job(self.admin_context, job_body)
self.assertIsNotNone(job1)
job_body = dict(tenant_id=self.tenant_id2, action=self.action,
completed=True)
job_body = dict(job=job_body)
job2 = job_api.create_job(self.admin_context, job_body)
self.assertIsNotNone(job2)
jobs = job_api.get_job(self.admin_context, job1['id'])
self.assertFalse(type(jobs) in [list, tuple])
job = jobs
self.assertFalse(job['completed'])
self.assertEqual(self.tenant_id, job['tenant_id'])
self.assertEqual(self.action, job['action'])
job = job_api.get_job(self.admin_context, job2['id'])
self.assertTrue(job['completed'])
self.assertEqual(self.tenant_id2, job['tenant_id'])
self.assertEqual(self.action, job['action'])
with self.assertRaises(q_exc.JobNotFound):
job_api.get_job(self.admin_context, 'derp')
jobs = job_api.get_jobs(self.admin_context)
self.assertTrue(type(jobs) in [list, tuple])
self.assertEqual(2, len(jobs))
jobs = job_api.get_jobs(self.admin_context, completed=True)
self.assertTrue(type(jobs) in [list, tuple])
self.assertEqual(1, len(jobs))
jobs = job_api.get_jobs(self.admin_context, completed=False)
self.assertTrue(type(jobs) in [list, tuple])
self.assertEqual(1, len(jobs))
jobs = job_api.get_jobs(self.admin_context, completed='hello')
self.assertTrue(type(jobs) in [list, tuple])
self.assertEqual(0, len(jobs))
jobs = job_api.get_jobs(self.admin_context, tenant_id=self.tenant_id)
self.assertTrue(type(jobs) in [list, tuple])
self.assertEqual(1, len(jobs))
jobs = job_api.get_jobs(self.admin_context, tenant_id='derp')
self.assertTrue(type(jobs) in [list, tuple])
self.assertEqual(0, len(jobs))
def test_get_job_different_non_admin(self):
job_body = dict(tenant_id=self.context.tenant_id, action=self.action,
completed=False)
job_body = dict(job=job_body)
job1 = job_api.create_job(self.admin_context, job_body)
self.assertIsNotNone(job1)
job_body = dict(tenant_id=self.tenant_id2, action=self.action,
completed=True)
job_body = dict(job=job_body)
job2 = job_api.create_job(self.admin_context, job_body)
self.assertIsNotNone(job2)
jobs = job_api.get_jobs(self.context)
self.assertTrue(type(jobs) in [list, tuple])
self.assertEqual(1, len(jobs))
self.assertEqual(self.context.tenant_id, jobs[0]['tenant_id'])
def test_update_jobs(self):
update_body = dict(completed=True)
update_body = dict(job=update_body)
with self.assertRaises(q_exc.JobNotFound):
job_api.update_job(self.admin_context, 'derp', update_body)
job_body = dict(tenant_id=self.tenant_id, action=self.action,
completed=False)
job_body = dict(job=job_body)
job1 = job_api.create_job(self.admin_context, job_body)
self.assertIsNotNone(job1)
job = job_api.get_job(self.admin_context, job1['id'])
self.assertFalse(job['completed'])
updated_job = job_api.update_job(self.admin_context, job1['id'],
update_body)
self.assertTrue(updated_job['completed'])
job = job_api.get_job(self.admin_context, job1['id'])
self.assertTrue(job['completed'])
def test_update_job_fail_non_admin(self):
update_body = dict(completed=True)
update_body = dict(job=update_body)
job_body = dict(tenant_id=self.tenant_id, action=self.action,
completed=False)
job_body = dict(job=job_body)
job1 = job_api.create_job(self.admin_context, job_body)
self.assertIsNotNone(job1)
job = job_api.get_job(self.admin_context, job1['id'])
self.assertFalse(job['completed'])
with self.assertRaises(n_exc.NotAuthorized):
job_api.update_job(self.context, job1['id'], update_body)
updated_job = job_api.update_job(self.admin_context, job1['id'],
update_body)
self.assertTrue(updated_job['completed'])
job = job_api.get_job(self.admin_context, job1['id'])
self.assertTrue(job['completed'])
def test_delete_jobs(self):
with self.assertRaises(q_exc.JobNotFound):
job_api.delete_job(self.admin_context, 'derp')
job_body = dict(tenant_id=self.tenant_id, action=self.action,
completed=False)
job_body = dict(job=job_body)
job1 = job_api.create_job(self.admin_context, job_body)
self.assertIsNotNone(job1)
job = job_api.get_job(self.admin_context, job1['id'])
self.assertFalse(job['completed'])
job_api.delete_job(self.admin_context, job1['id'])
with self.assertRaises(q_exc.JobNotFound):
job_api.get_job(self.admin_context, job1['id'])
with self.assertRaises(q_exc.JobNotFound):
job_api.delete_job(self.admin_context, job1['id'])
def test_delete_job_fail_non_admin(self):
with self.assertRaises(n_exc.NotAuthorized):
job_api.delete_job(self.context, 'derp')
job_body = dict(tenant_id=self.tenant_id, action=self.action,
completed=False)
job_body = dict(job=job_body)
job1 = job_api.create_job(self.admin_context, job_body)
self.assertIsNotNone(job1)
job = job_api.get_job(self.admin_context, job1['id'])
self.assertFalse(job['completed'])
with self.assertRaises(n_exc.NotAuthorized):
job_api.delete_job(self.context, job1['id'])
job_api.delete_job(self.admin_context, job1['id'])
with self.assertRaises(q_exc.JobNotFound):
job_api.get_job(self.context, job1['id'])

149
quark/tools/async_worker.py Normal file
View File

@ -0,0 +1,149 @@
import eventlet
eventlet.monkey_patch(socket=True, select=True, time=True)
import sys
import time
from oslo_config import cfg
from oslo_log import log as logging
import oslo_messaging as messaging
from oslo_service import service as common_service
from oslo_utils import excutils
from neutron._i18n import _
from neutron._i18n import _LE
from neutron.common import config
from neutron.common import rpc as n_rpc
from neutron.db import api as session
from neutron import service
service_opts = [
cfg.StrOpt('topic',
help=_('Topic for messaging to pub/sub to')),
cfg.StrOpt('transport_url',
help=_('Connection string for transport service')),
cfg.IntOpt('periodic_interval',
default=40,
help=_('Seconds between running periodic tasks')),
cfg.IntOpt('rpc_workers',
default=1,
help=_('Number of RPC worker processes for service')),
cfg.IntOpt('periodic_fuzzy_delay',
default=5,
help=_('Range of seconds to randomly delay when starting the '
'periodic task scheduler to reduce stampeding. '
'(Disable by setting to 0)')),
]
CONF = cfg.CONF
CONF.register_opts(service_opts, "QUARK_ASYNC")
LOG = logging.getLogger(__name__)
class QuarkRpcTestCallback(object):
target = messaging.Target(version='1.0', namespace=None)
def stuff(self, context, **kwargs):
return {"status": "okay"}
class QuarkAsyncPlugin(object):
def __init__(self):
pass
def _setup_rpc(self):
self.endpoints = [QuarkRpcTestCallback()]
def start_rpc_listeners(self):
"""Configure all listeners here"""
self._setup_rpc()
self.conn = n_rpc.create_connection()
self.conn.create_consumer(CONF.QUARK_ASYNC.topic, self.endpoints,
fanout=False)
return self.conn.consume_in_threads()
def serve_rpc():
if cfg.CONF.QUARK_ASYNC.rpc_workers < 1:
cfg.CONF.set_override('rpc_workers', 1, "QUARK_ASYNC")
try:
plugins = [QuarkAsyncPlugin()]
rpc = service.RpcWorker(plugins)
session.dispose() # probaby not needed, but maybe
launcher = common_service.ProcessLauncher(CONF, wait_interval=1.0)
launcher.launch_service(rpc, workers=CONF.QUARK_ASYNC.rpc_workers)
return launcher
except Exception:
with excutils.save_and_reraise_exception():
LOG.exception(_LE('Unrecoverable error: please check log for '
'details.'))
def start_api_and_rpc_workers():
pool = eventlet.GreenPool()
quark_rpc = serve_rpc()
pool.spawn(quark_rpc.wait)
pool.waitall()
def boot_server(server_func):
# the configuration will be read into the cfg.CONF global data structure
config.init(sys.argv[1:])
config.setup_logging()
config.set_config_defaults()
if not cfg.CONF.config_file:
sys.exit(_("ERROR: Unable to find configuration file via the default"
" search paths (~/.neutron/, ~/, /etc/neutron/, /etc/) and"
" the '--config-file' option!"))
try:
server_func()
except KeyboardInterrupt:
pass
except RuntimeError as e:
sys.exit(_("ERROR: %s") % e)
def main():
boot_server(start_api_and_rpc_workers)
class QuarkRpcTestApi(object):
"""This class is used for testing QuarkRpcTestCallback."""
def __init__(self):
target = messaging.Target(topic=CONF.QUARK_ASYNC.topic)
self.client = n_rpc.get_client(target)
def stuff(self, context):
cctxt = self.client.prepare(version='1.0')
return cctxt.call(context, 'stuff')
class QuarkAsyncTestContext(object):
"""This class is used for testing QuarkRpcTestCallback."""
def __init__(self):
self.time = time.ctime()
def to_dict(self):
return {"application": "rpc-client", "time": time.ctime()}
def test_main():
config.init(sys.argv[1:])
config.setup_logging()
config.set_config_defaults()
if not cfg.CONF.config_file:
sys.exit(_("ERROR: Unable to find configuration file via the default"
" search paths (~/.neutron/, ~/, /etc/neutron/, /etc/) and"
" the '--config-file' option!"))
context = QuarkAsyncTestContext() # typically context is neutron context
client = QuarkRpcTestApi()
LOG.info(client.stuff(context))
time.sleep(0) # necessary for preventing Timeout exceptions
if __name__ == "__main__":
main()

View File

@ -21,6 +21,8 @@ console_scripts =
quark-db-manage = quark.db.migration.alembic.cli:main
gunicorn-neutron-server = quark.gunicorn_server:main
quark-agent = quark.agent.agent:main
quark-async-worker = quark.tools.async_worker:main
quark-async-tester = quark.tools.async_worker:test_main
ip_availability = quark.ip_availability:main
redis_sg_tool = quark.tools.redis_sg_tool:main
null_routes = quark.tools.null_routes:main