Merge "Replace publisher name by URL"
This commit is contained in:
commit
1d13c312ee
@ -118,6 +118,9 @@ class Pipeline(object):
|
||||
|
||||
self.publishers = []
|
||||
for p in cfg['publishers']:
|
||||
if '://' not in p:
|
||||
# Support old format without URL
|
||||
p = p + "://"
|
||||
try:
|
||||
self.publishers.append(publisher.get_publisher(p))
|
||||
except Exception:
|
||||
|
@ -20,16 +20,18 @@
|
||||
|
||||
import abc
|
||||
from stevedore import driver
|
||||
import urlparse
|
||||
|
||||
|
||||
def get_publisher(name, namespace='ceilometer.publisher'):
|
||||
def get_publisher(url, namespace='ceilometer.publisher'):
|
||||
"""Get publisher driver and load it.
|
||||
|
||||
:param name: Name of the publisher driver.
|
||||
:param URL: URL for the publisher
|
||||
:param namespace: Namespace to use to look for drivers.
|
||||
"""
|
||||
loaded_driver = driver.DriverManager(namespace, name)
|
||||
return loaded_driver.driver()
|
||||
parse_result = urlparse.urlparse(url)
|
||||
loaded_driver = driver.DriverManager(namespace, parse_result.scheme)
|
||||
return loaded_driver.driver(parse_result)
|
||||
|
||||
|
||||
class PublisherBase(object):
|
||||
@ -37,6 +39,9 @@ class PublisherBase(object):
|
||||
|
||||
__metaclass__ = abc.ABCMeta
|
||||
|
||||
def __init__(self, parsed_url):
|
||||
pass
|
||||
|
||||
@abc.abstractmethod
|
||||
def publish_counters(self, context, counters, source):
|
||||
"Publish counters into final conduit."
|
||||
|
@ -24,7 +24,7 @@ from ceilometer import publisher
|
||||
class TestPublisher(publisher.PublisherBase):
|
||||
"""Publisher used in unit testing."""
|
||||
|
||||
def __init__(self):
|
||||
def __init__(self, parsed_url):
|
||||
self.counters = []
|
||||
|
||||
def publish_counters(self, context, counters, source):
|
||||
|
@ -20,42 +20,24 @@
|
||||
|
||||
from ceilometer import publisher
|
||||
from ceilometer.openstack.common import log
|
||||
from ceilometer.openstack.common import network_utils
|
||||
from ceilometer.openstack.common.gettextutils import _
|
||||
from oslo.config import cfg
|
||||
import msgpack
|
||||
import socket
|
||||
from oslo.config import cfg
|
||||
|
||||
cfg.CONF.import_opt('udp_port', 'ceilometer.collector.service',
|
||||
group='collector')
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
UDP_PUBLISH_GROUP = cfg.OptGroup(name='publisher_udp',
|
||||
title='Options for UDP publisher')
|
||||
|
||||
UDP_PUBLISH_OPTS = [
|
||||
cfg.StrOpt('host',
|
||||
default="localhost",
|
||||
help='The host target to publish metering records to.',
|
||||
),
|
||||
cfg.IntOpt('port',
|
||||
default=4952,
|
||||
help='The port to send UDP meters to.',
|
||||
),
|
||||
]
|
||||
|
||||
|
||||
def register_opts(config):
|
||||
"""Register the options for publishing UDP messages.
|
||||
"""
|
||||
config.register_group(UDP_PUBLISH_GROUP)
|
||||
config.register_opts(UDP_PUBLISH_OPTS,
|
||||
group=UDP_PUBLISH_GROUP)
|
||||
|
||||
|
||||
register_opts(cfg.CONF)
|
||||
|
||||
|
||||
class UDPPublisher(publisher.PublisherBase):
|
||||
|
||||
def __init__(self):
|
||||
def __init__(self, parsed_url):
|
||||
self.host, self.port = network_utils.parse_host_port(
|
||||
parsed_url.netloc,
|
||||
default_port=cfg.CONF.collector.udp_port)
|
||||
self.socket = socket.socket(socket.AF_INET,
|
||||
socket.SOCK_DGRAM)
|
||||
|
||||
@ -70,14 +52,13 @@ class UDPPublisher(publisher.PublisherBase):
|
||||
for counter in counters:
|
||||
msg = counter._asdict()
|
||||
msg['source'] = source
|
||||
host = cfg.CONF.publisher_udp.host
|
||||
port = cfg.CONF.publisher_udp.port
|
||||
host = self.host
|
||||
port = self.port
|
||||
LOG.debug(_("Publishing counter %(msg)s over "
|
||||
"UDP to %(host)s:%(port)d") % locals())
|
||||
try:
|
||||
self.socket.sendto(msgpack.dumps(msg),
|
||||
(cfg.CONF.publisher_udp.host,
|
||||
cfg.CONF.publisher_udp.port))
|
||||
(self.host, self.port))
|
||||
except Exception as e:
|
||||
LOG.warn(_("Unable to send counter over UDP"))
|
||||
LOG.exception(e)
|
||||
|
@ -16,17 +16,6 @@
|
||||
#pipeline_cfg_file=pipeline.yaml
|
||||
|
||||
|
||||
#
|
||||
# Options defined in ceilometer.policy
|
||||
#
|
||||
|
||||
# JSON file representing policy (string value)
|
||||
#policy_file=policy.json
|
||||
|
||||
# Rule checked when requested rule is not found (string value)
|
||||
#policy_default_rule=default
|
||||
|
||||
|
||||
#
|
||||
# Options defined in ceilometer.api.app
|
||||
#
|
||||
@ -239,6 +228,18 @@
|
||||
#notification_topics=notifications
|
||||
|
||||
|
||||
#
|
||||
# Options defined in ceilometer.openstack.common.policy
|
||||
#
|
||||
|
||||
# JSON file containing policy (string value)
|
||||
#policy_file=policy.json
|
||||
|
||||
# Rule enforced when requested rule is not found (string
|
||||
# value)
|
||||
#policy_default_rule=default
|
||||
|
||||
|
||||
#
|
||||
# Options defined in ceilometer.openstack.common.rpc
|
||||
#
|
||||
@ -273,15 +274,6 @@
|
||||
#control_exchange=openstack
|
||||
|
||||
|
||||
#
|
||||
# Options defined in ceilometer.openstack.common.rpc.amqp
|
||||
#
|
||||
|
||||
# Enable a fast single reply queue if using AMQP based RPC
|
||||
# like RabbitMQ or Qpid. (boolean value)
|
||||
#amqp_rpc_single_reply_queue=false
|
||||
|
||||
|
||||
#
|
||||
# Options defined in ceilometer.openstack.common.rpc.impl_kombu
|
||||
#
|
||||
@ -407,7 +399,7 @@
|
||||
# Name of this node. Must be a valid hostname, FQDN, or IP
|
||||
# address. Must match "host" option, if running Nova. (string
|
||||
# value)
|
||||
#rpc_zmq_host=dex
|
||||
#rpc_zmq_host=ceilometer
|
||||
|
||||
|
||||
#
|
||||
@ -445,20 +437,6 @@
|
||||
#cinder_control_exchange=cinder
|
||||
|
||||
|
||||
[publisher_udp]
|
||||
|
||||
#
|
||||
# Options defined in ceilometer.publisher.udp
|
||||
#
|
||||
|
||||
# The host target to publish metering records to. (string
|
||||
# value)
|
||||
#host=localhost
|
||||
|
||||
# The port to send UDP meters to. (integer value)
|
||||
#port=4952
|
||||
|
||||
|
||||
[database]
|
||||
|
||||
#
|
||||
@ -479,7 +457,11 @@
|
||||
|
||||
# The SQLAlchemy connection string used to connect to the
|
||||
# database (string value)
|
||||
#connection=sqlite:////common/db/$sqlite_db
|
||||
#connection=sqlite:////ceilometer/openstack/common/db/$sqlite_db
|
||||
|
||||
# The SQLAlchemy connection string used to connect to the
|
||||
# slave database (string value)
|
||||
#slave_connection=
|
||||
|
||||
# timeout before idle sql connections are reaped (integer
|
||||
# value)
|
||||
@ -491,7 +473,7 @@
|
||||
|
||||
# Maximum number of SQL connections to keep open in a pool
|
||||
# (integer value)
|
||||
#max_pool_size=5
|
||||
#max_pool_size=<None>
|
||||
|
||||
# maximum db connection retries during startup. (setting -1
|
||||
# implies an infinite retry count) (integer value)
|
||||
@ -513,6 +495,10 @@
|
||||
# value)
|
||||
#connection_trace=false
|
||||
|
||||
# If set, use this value for pool_timeout with sqlalchemy
|
||||
# (integer value)
|
||||
#pool_timeout=<None>
|
||||
|
||||
|
||||
[publisher_meter]
|
||||
|
||||
@ -614,4 +600,4 @@
|
||||
#password=<None>
|
||||
|
||||
|
||||
# Total option count: 117
|
||||
# Total option count: 116
|
||||
|
@ -6,4 +6,4 @@
|
||||
- "*"
|
||||
transformers:
|
||||
publishers:
|
||||
- meter
|
||||
- meter://
|
||||
|
@ -255,7 +255,7 @@ class TestPublish(base.TestCase):
|
||||
super(TestPublish, self).setUp()
|
||||
self.published = []
|
||||
self.stubs.Set(rpc, 'cast', self.faux_cast)
|
||||
publisher = meter.MeterPublisher()
|
||||
publisher = meter.MeterPublisher(None)
|
||||
publisher.publish_counters(None,
|
||||
self.test_data,
|
||||
'test')
|
||||
|
@ -22,6 +22,7 @@ import datetime
|
||||
import mock
|
||||
import msgpack
|
||||
from oslo.config import cfg
|
||||
import urlparse
|
||||
|
||||
from ceilometer import counter
|
||||
from ceilometer.publisher import udp
|
||||
@ -104,7 +105,7 @@ class TestUDPPublisher(base.TestCase):
|
||||
self.data_sent = []
|
||||
with mock.patch('socket.socket',
|
||||
self._make_fake_socket(self.data_sent)):
|
||||
publisher = udp.UDPPublisher()
|
||||
publisher = udp.UDPPublisher(urlparse.urlparse('udp://somehost'))
|
||||
publisher.publish_counters(None,
|
||||
self.test_data,
|
||||
self.COUNTER_SOURCE)
|
||||
@ -122,8 +123,8 @@ class TestUDPPublisher(base.TestCase):
|
||||
sent_counters.append(counter)
|
||||
|
||||
# Check destination
|
||||
self.assertEqual(dest, (cfg.CONF.publisher_udp.host,
|
||||
cfg.CONF.publisher_udp.port))
|
||||
self.assertEqual(dest, ('somehost',
|
||||
cfg.CONF.collector.udp_port))
|
||||
|
||||
# Check that counters are equal
|
||||
self.assertEqual(sorted(sent_counters),
|
||||
@ -141,7 +142,7 @@ class TestUDPPublisher(base.TestCase):
|
||||
def test_publish_error(self):
|
||||
with mock.patch('socket.socket',
|
||||
self._make_broken_socket):
|
||||
publisher = udp.UDPPublisher()
|
||||
publisher = udp.UDPPublisher(urlparse.urlparse('udp://localhost'))
|
||||
publisher.publish_counters(None,
|
||||
self.test_data,
|
||||
self.COUNTER_SOURCE)
|
||||
|
@ -54,13 +54,13 @@ class TestPipeline(base.TestCase):
|
||||
|
||||
raise KeyError(name)
|
||||
|
||||
def get_publisher(self, name, namespace=''):
|
||||
fake_drivers = {'test': test_publisher.TestPublisher,
|
||||
'new': test_publisher.TestPublisher,
|
||||
'except': self.PublisherClassException}
|
||||
return fake_drivers[name]()
|
||||
def get_publisher(self, url, namespace=''):
|
||||
fake_drivers = {'test://': test_publisher.TestPublisher,
|
||||
'new://': test_publisher.TestPublisher,
|
||||
'except://': self.PublisherClassException}
|
||||
return fake_drivers[url](url)
|
||||
|
||||
class PublisherClassException():
|
||||
class PublisherClassException(publisher.PublisherBase):
|
||||
def publish_counters(self, ctxt, counters, source):
|
||||
raise Exception()
|
||||
|
||||
@ -127,7 +127,7 @@ class TestPipeline(base.TestCase):
|
||||
{'name': "update",
|
||||
'parameters': {}}
|
||||
],
|
||||
'publishers': ["test"],
|
||||
'publishers': ["test://"],
|
||||
}, ]
|
||||
|
||||
def _exception_create_pipelinemanager(self):
|
||||
@ -475,7 +475,7 @@ class TestPipeline(base.TestCase):
|
||||
== 'a_update')
|
||||
|
||||
def test_multiple_publisher(self):
|
||||
self.pipeline_cfg[0]['publishers'] = ['test', 'new']
|
||||
self.pipeline_cfg[0]['publishers'] = ['test://', 'new://']
|
||||
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
|
||||
self.transformer_manager)
|
||||
|
||||
@ -492,7 +492,7 @@ class TestPipeline(base.TestCase):
|
||||
'a_update')
|
||||
|
||||
def test_multiple_publisher_isolation(self):
|
||||
self.pipeline_cfg[0]['publishers'] = ['except', 'new']
|
||||
self.pipeline_cfg[0]['publishers'] = ['except://', 'new://']
|
||||
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
|
||||
self.transformer_manager)
|
||||
with pipeline_manager.publisher(None, None) as p:
|
||||
@ -614,7 +614,7 @@ class TestPipeline(base.TestCase):
|
||||
{'name': "update",
|
||||
'parameters': {}}
|
||||
],
|
||||
'publishers': ["test"],
|
||||
'publishers': ["test://"],
|
||||
}, ]
|
||||
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
|
||||
self.transformer_manager)
|
||||
|
Loading…
x
Reference in New Issue
Block a user