spawn multiple workers in services
the collector service only ever grabs one connection to the database at a time. this causes a massive backlog on the message queue. enable ability to spawn multiple workers for notification and collector services. Change-Id: Ie85c4a6eace52c771c3eb8425839d253f8dd74cd Closes-Bug: #1291054
This commit is contained in:
parent
e0f83ff425
commit
0f038d2fe2
@ -84,8 +84,12 @@ def agent_compute():
|
|||||||
|
|
||||||
def agent_notification():
|
def agent_notification():
|
||||||
service.prepare_service()
|
service.prepare_service()
|
||||||
os_service.launch(notification.NotificationService(
|
launcher = os_service.ProcessLauncher()
|
||||||
cfg.CONF.host, 'ceilometer.agent.notification')).wait()
|
launcher.launch_service(
|
||||||
|
notification.NotificationService(cfg.CONF.host,
|
||||||
|
'ceilometer.agent.notification'),
|
||||||
|
workers=service.get_workers('notification'))
|
||||||
|
launcher.wait()
|
||||||
|
|
||||||
|
|
||||||
def api():
|
def api():
|
||||||
@ -96,8 +100,12 @@ def api():
|
|||||||
|
|
||||||
def collector_service():
|
def collector_service():
|
||||||
service.prepare_service()
|
service.prepare_service()
|
||||||
os_service.launch(collector.CollectorService(
|
launcher = os_service.ProcessLauncher()
|
||||||
cfg.CONF.host, 'ceilometer.collector')).wait()
|
launcher.launch_service(
|
||||||
|
collector.CollectorService(cfg.CONF.host,
|
||||||
|
'ceilometer.collector'),
|
||||||
|
workers=service.get_workers('collector'))
|
||||||
|
launcher.wait()
|
||||||
|
|
||||||
|
|
||||||
def storage_dbsync():
|
def storage_dbsync():
|
||||||
|
@ -28,6 +28,7 @@ from ceilometer.openstack.common import gettextutils
|
|||||||
from ceilometer.openstack.common.gettextutils import _ # noqa
|
from ceilometer.openstack.common.gettextutils import _ # noqa
|
||||||
from ceilometer.openstack.common import log
|
from ceilometer.openstack.common import log
|
||||||
from ceilometer.openstack.common import rpc
|
from ceilometer.openstack.common import rpc
|
||||||
|
from ceilometer import utils
|
||||||
|
|
||||||
|
|
||||||
OPTS = [
|
OPTS = [
|
||||||
@ -40,6 +41,12 @@ OPTS = [
|
|||||||
deprecated_group="collector",
|
deprecated_group="collector",
|
||||||
default=['database'],
|
default=['database'],
|
||||||
help='Dispatcher to process data.'),
|
help='Dispatcher to process data.'),
|
||||||
|
cfg.IntOpt('collector_workers',
|
||||||
|
help='Number of workers for collector service. The default '
|
||||||
|
'will be equal to the number of CPUs available.'),
|
||||||
|
cfg.IntOpt('notification_workers',
|
||||||
|
help='Number of workers for notification service. The default '
|
||||||
|
'will be equal to the number of CPUs available.'),
|
||||||
]
|
]
|
||||||
cfg.CONF.register_opts(OPTS)
|
cfg.CONF.register_opts(OPTS)
|
||||||
|
|
||||||
@ -88,6 +95,11 @@ cfg.CONF.register_cli_opts(CLI_OPTIONS, group="service_credentials")
|
|||||||
LOG = log.getLogger(__name__)
|
LOG = log.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class WorkerException(Exception):
|
||||||
|
"""Exception for errors relating to service workers
|
||||||
|
"""
|
||||||
|
|
||||||
|
|
||||||
class DispatchedService(object):
|
class DispatchedService(object):
|
||||||
|
|
||||||
DISPATCHER_NAMESPACE = 'ceilometer.dispatcher'
|
DISPATCHER_NAMESPACE = 'ceilometer.dispatcher'
|
||||||
@ -106,6 +118,17 @@ class DispatchedService(object):
|
|||||||
self.DISPATCHER_NAMESPACE)
|
self.DISPATCHER_NAMESPACE)
|
||||||
|
|
||||||
|
|
||||||
|
def get_workers(name):
|
||||||
|
workers = (cfg.CONF.get('%s_workers' % name) or
|
||||||
|
utils.cpu_count())
|
||||||
|
if workers and workers < 1:
|
||||||
|
msg = (_("%(worker_name)s value of %(workers)s is invalid, "
|
||||||
|
"must be greater than 0") %
|
||||||
|
{'worker_name': '%s_workers' % name, 'workers': str(workers)})
|
||||||
|
raise WorkerException(msg)
|
||||||
|
return workers
|
||||||
|
|
||||||
|
|
||||||
def prepare_service(argv=None):
|
def prepare_service(argv=None):
|
||||||
gettextutils.install('ceilometer', lazy=True)
|
gettextutils.install('ceilometer', lazy=True)
|
||||||
rpc.set_defaults(control_exchange='ceilometer')
|
rpc.set_defaults(control_exchange='ceilometer')
|
||||||
|
@ -22,6 +22,7 @@ import calendar
|
|||||||
import copy
|
import copy
|
||||||
import datetime
|
import datetime
|
||||||
import decimal
|
import decimal
|
||||||
|
import multiprocessing
|
||||||
|
|
||||||
from ceilometer.openstack.common import timeutils
|
from ceilometer.openstack.common import timeutils
|
||||||
from ceilometer.openstack.common import units
|
from ceilometer.openstack.common import units
|
||||||
@ -147,3 +148,10 @@ def update_nested(original_dict, updates):
|
|||||||
else:
|
else:
|
||||||
dict_to_update[key] = updates[key]
|
dict_to_update[key] = updates[key]
|
||||||
return dict_to_update
|
return dict_to_update
|
||||||
|
|
||||||
|
|
||||||
|
def cpu_count():
|
||||||
|
try:
|
||||||
|
return multiprocessing.cpu_count() or 1
|
||||||
|
except NotImplementedError:
|
||||||
|
return 1
|
||||||
|
@ -40,6 +40,14 @@
|
|||||||
# Dispatcher to process data. (multi valued)
|
# Dispatcher to process data. (multi valued)
|
||||||
#dispatcher=database
|
#dispatcher=database
|
||||||
|
|
||||||
|
# Number of workers for collector service. The default will be
|
||||||
|
# equal to the number of CPUs available. (integer value)
|
||||||
|
#collector_workers=<None>
|
||||||
|
|
||||||
|
# Number of workers for notification service. The default will
|
||||||
|
# be equal to the number of CPUs available. (integer value)
|
||||||
|
#notification_workers=<None>
|
||||||
|
|
||||||
|
|
||||||
#
|
#
|
||||||
# Options defined in ceilometer.api.app
|
# Options defined in ceilometer.api.app
|
||||||
|
Loading…
x
Reference in New Issue
Block a user