diff --git a/ceilometer/openstack/common/cfg.py b/ceilometer/openstack/common/cfg.py index f65492aaf..b024da8c8 100644 --- a/ceilometer/openstack/common/cfg.py +++ b/ceilometer/openstack/common/cfg.py @@ -236,10 +236,10 @@ log files: This module also contains a global instance of the CommonConfigOpts class in order to support a common usage pattern in OpenStack: - from openstack.common import cfg + from ceilometer.openstack.common import cfg opts = [ - cfg.StrOpt('bind_host' default='0.0.0.0'), + cfg.StrOpt('bind_host', default='0.0.0.0'), cfg.IntOpt('bind_port', default=9292), ] diff --git a/ceilometer/openstack/common/gettextutils.py b/ceilometer/openstack/common/gettextutils.py index 235350cc4..f12fa1c25 100644 --- a/ceilometer/openstack/common/gettextutils.py +++ b/ceilometer/openstack/common/gettextutils.py @@ -20,7 +20,7 @@ gettext for openstack-common modules. Usual usage in an openstack.common module: - from openstack.common.gettextutils import _ + from ceilometer.openstack.common.gettextutils import _ """ import gettext diff --git a/ceilometer/openstack/common/log.py b/ceilometer/openstack/common/log.py index 0683f98de..25e52139b 100644 --- a/ceilometer/openstack/common/log.py +++ b/ceilometer/openstack/common/log.py @@ -76,6 +76,9 @@ log_opts = [ cfg.BoolOpt('publish_errors', default=False, help='publish error events'), + cfg.BoolOpt('fatal_deprecations', + default=False, + help='make deprecations fatal'), # NOTE(mikal): there are two options here because sometimes we are handed # a full instance (and could include more information), and other times we @@ -170,6 +173,14 @@ class ContextAdapter(logging.LoggerAdapter): def audit(self, msg, *args, **kwargs): self.log(logging.AUDIT, msg, *args, **kwargs) + def deprecated(self, msg, *args, **kwargs): + stdmsg = _("Deprecated Config: %s") % msg + if CONF.fatal_deprecations: + self.critical(stdmsg, *args, **kwargs) + raise DeprecatedConfig(msg=stdmsg) + else: + self.warn(stdmsg, *args, **kwargs) + def process(self, msg, kwargs): if 'extra' not in kwargs: kwargs['extra'] = {} @@ -450,3 +461,10 @@ class ColorHandler(logging.StreamHandler): def format(self, record): record.color = self.LEVEL_COLORS[record.levelno] return logging.StreamHandler.format(self, record) + + +class DeprecatedConfig(Exception): + message = _("Fatal call to deprecated config: %(msg)s") + + def __init__(self, msg): + super(Exception, self).__init__(self.message % dict(msg=msg)) diff --git a/ceilometer/openstack/common/rpc/impl_kombu.py b/ceilometer/openstack/common/rpc/impl_kombu.py index a68154e43..2862edfdd 100644 --- a/ceilometer/openstack/common/rpc/impl_kombu.py +++ b/ceilometer/openstack/common/rpc/impl_kombu.py @@ -31,9 +31,9 @@ import kombu.messaging from ceilometer.openstack.common import cfg from ceilometer.openstack.common.gettextutils import _ +from ceilometer.openstack.common import network_utils from ceilometer.openstack.common.rpc import amqp as rpc_amqp from ceilometer.openstack.common.rpc import common as rpc_common -from ceilometer.openstack.common import network_utils kombu_opts = [ cfg.StrOpt('kombu_ssl_version', @@ -267,6 +267,7 @@ class FanoutConsumer(ConsumerBase): # Default options options = {'durable': False, + 'queue_arguments': _get_queue_arguments(conf), 'auto_delete': True, 'exclusive': True} options.update(kwargs) diff --git a/ceilometer/openstack/common/rpc/impl_zmq.py b/ceilometer/openstack/common/rpc/impl_zmq.py index 0b2e50a7f..bf6a81dfc 100644 --- a/ceilometer/openstack/common/rpc/impl_zmq.py +++ b/ceilometer/openstack/common/rpc/impl_zmq.py @@ -58,9 +58,6 @@ zmq_opts = [ cfg.IntOpt('rpc_zmq_port', default=9501, help='ZeroMQ receiver listening port'), - cfg.IntOpt('rpc_zmq_port_pub', default=9502, - help='ZeroMQ fanout publisher port'), - cfg.IntOpt('rpc_zmq_contexts', default=1, help='Number of ZeroMQ contexts, defaults to 1'), @@ -209,7 +206,7 @@ class ZmqClient(object): self.outq = ZmqSocket(addr, socket_type, bind=bind) def cast(self, msg_id, topic, data): - self.outq.send([str(topic), str(msg_id), str('cast'), + self.outq.send([str(msg_id), str(topic), str('cast'), _serialize(data)]) def close(self): @@ -302,9 +299,6 @@ class ConsumerBase(object): else: return [result] - def consume(self, sock): - raise NotImplementedError() - def process(self, style, target, proxy, ctx, data): # Method starting with - are # processed internally. (non-valid method name) @@ -417,17 +411,12 @@ class ZmqProxy(ZmqBaseReactor): zmq.PUB, bind=True) self.sockets.append(self.topic_proxy['zmq_replies']) - self.topic_proxy['fanout~'] = \ - ZmqSocket("tcp://%s:%s" % (CONF.rpc_zmq_bind_address, - CONF.rpc_zmq_port_pub), zmq.PUB, bind=True) - self.sockets.append(self.topic_proxy['fanout~']) - def consume(self, sock): ipc_dir = CONF.rpc_zmq_ipc_dir #TODO(ewindisch): use zero-copy (i.e. references, not copying) data = sock.recv() - topic, msg_id, style, in_msg = data + msg_id, topic, style, in_msg = data topic = topic.split('.', 1)[0] LOG.debug(_("CONSUMER GOT %s"), ' '.join(map(pformat, data))) @@ -435,11 +424,6 @@ class ZmqProxy(ZmqBaseReactor): # Handle zmq_replies magic if topic.startswith('fanout~'): sock_type = zmq.PUB - - # This doesn't change what is in the message, - # it only specifies that these messages go to - # the generic fanout topic. - topic = 'fanout~' elif topic.startswith('zmq_replies'): sock_type = zmq.PUB inside = _deserialize(in_msg) @@ -450,32 +434,23 @@ class ZmqProxy(ZmqBaseReactor): else: sock_type = zmq.PUSH - if not topic in self.topic_proxy: - outq = ZmqSocket("ipc://%s/zmq_topic_%s" % (ipc_dir, topic), - sock_type, bind=True) - self.topic_proxy[topic] = outq - self.sockets.append(outq) - LOG.info(_("Created topic proxy: %s"), topic) + if not topic in self.topic_proxy: + outq = ZmqSocket("ipc://%s/zmq_topic_%s" % (ipc_dir, topic), + sock_type, bind=True) + self.topic_proxy[topic] = outq + self.sockets.append(outq) + LOG.info(_("Created topic proxy: %s"), topic) + + # It takes some time for a pub socket to open, + # before we can have any faith in doing a send() to it. + if sock_type == zmq.PUB: + eventlet.sleep(.5) LOG.debug(_("ROUTER RELAY-OUT START %(data)s") % {'data': data}) self.topic_proxy[topic].send(data) LOG.debug(_("ROUTER RELAY-OUT SUCCEEDED %(data)s") % {'data': data}) -class CallbackReactor(ZmqBaseReactor): - """ - A consumer class passing messages to a callback - """ - - def __init__(self, conf, callback): - self._cb = callback - super(CallbackReactor, self).__init__(conf) - - def consume(self, sock): - data = sock.recv() - self._cb(data[3]) - - class ZmqReactor(ZmqBaseReactor): """ A consumer class implementing a @@ -496,7 +471,7 @@ class ZmqReactor(ZmqBaseReactor): self.mapping[sock].send(data) return - topic, msg_id, style, in_msg = data + msg_id, topic, style, in_msg = data ctx, request = _deserialize(in_msg) ctx = RpcContext.unmarshal(ctx) @@ -513,26 +488,6 @@ class Connection(rpc_common.Connection): def __init__(self, conf): self.reactor = ZmqReactor(conf) - def _consume_fanout(self, reactor, topic, proxy, bind=False): - for topic, host in matchmaker.queues("publishers~%s" % (topic, )): - inaddr = "tcp://%s:%s" % (host, CONF.rpc_zmq_port) - reactor.register(proxy, inaddr, zmq.SUB, in_bind=bind) - - def declare_topic_consumer(self, topic, callback=None, - queue_name=None): - """declare_topic_consumer is a private method, but - it is being used by Quantum (Folsom). - This has been added compatibility. - """ - # Only consume on the base topic name. - topic = topic.split('.', 1)[0] - - if CONF.rpc_zmq_host in matchmaker.queues("fanout~%s" % (topic, )): - return - - reactor = CallbackReactor(CONF, callback) - self._consume_fanout(reactor, topic, None, bind=False) - def create_consumer(self, topic, proxy, fanout=False): # Only consume on the base topic name. topic = topic.split('.', 1)[0] @@ -540,35 +495,22 @@ class Connection(rpc_common.Connection): LOG.info(_("Create Consumer for topic (%(topic)s)") % {'topic': topic}) - # Consume direct-push fanout messages (relay to local consumers) + # Subscription scenarios if fanout: - # If we're not in here, we can't receive direct fanout messages - if CONF.rpc_zmq_host in matchmaker.queues(topic): - # Consume from all remote publishers. - self._consume_fanout(self.reactor, topic, proxy) - else: - LOG.warn("This service cannot receive direct PUSH fanout " - "messages without being known by the matchmaker.") - return - - # Configure consumer for direct pushes. - subscribe = (topic, fanout)[type(fanout) == str] + subscribe = ('', fanout)[type(fanout) == str] sock_type = zmq.SUB topic = 'fanout~' + topic - - inaddr = "tcp://127.0.0.1:%s" % (CONF.rpc_zmq_port_pub, ) else: sock_type = zmq.PULL subscribe = None - # Receive messages from (local) proxy - inaddr = "ipc://%s/zmq_topic_%s" % \ - (CONF.rpc_zmq_ipc_dir, topic) + # Receive messages from (local) proxy + inaddr = "ipc://%s/zmq_topic_%s" % \ + (CONF.rpc_zmq_ipc_dir, topic) LOG.debug(_("Consumer is a zmq.%s"), ['PULL', 'SUB'][sock_type == zmq.SUB]) - # Consume messages from local rpc-zmq-receiver daemon. self.reactor.register(proxy, inaddr, sock_type, subscribe=subscribe, in_bind=False) diff --git a/ceilometer/openstack/common/rpc/matchmaker.py b/ceilometer/openstack/common/rpc/matchmaker.py index 268c05f42..bf4f85d4b 100644 --- a/ceilometer/openstack/common/rpc/matchmaker.py +++ b/ceilometer/openstack/common/rpc/matchmaker.py @@ -132,14 +132,6 @@ class FanoutBinding(Binding): return False -class PublisherBinding(Binding): - """Match on publishers keys, where key starts with 'publishers.' string.""" - def test(self, key): - if key.startswith('publishers~'): - return True - return False - - class StubExchange(Exchange): """Exchange that does nothing.""" def run(self, key): @@ -190,23 +182,6 @@ class RoundRobinRingExchange(RingExchange): return [(key + '.' + host, host)] -class PublisherRingExchange(RingExchange): - """Fanout Exchange based on a hashmap.""" - def __init__(self, ring=None): - super(PublisherRingExchange, self).__init__(ring) - - def run(self, key): - # Assume starts with "publishers~", strip it for lookup. - nkey = key.split('publishers~')[1:][0] - if not self._ring_has(nkey): - LOG.warn( - _("No key defining hosts for topic '%s', " - "see ringfile") % (nkey, ) - ) - return [] - return map(lambda x: (key + '.' + x, x), self.ring[nkey]) - - class FanoutRingExchange(RingExchange): """Fanout Exchange based on a hashmap.""" def __init__(self, ring=None): @@ -221,8 +196,7 @@ class FanoutRingExchange(RingExchange): "see ringfile") % (nkey, ) ) return [] - return map(lambda x: (key + '.' + x, x), self.ring[nkey] + - ['localhost']) + return map(lambda x: (key + '.' + x, x), self.ring[nkey]) class LocalhostExchange(Exchange): @@ -253,7 +227,6 @@ class MatchMakerRing(MatchMakerBase): """ def __init__(self, ring=None): super(MatchMakerRing, self).__init__() - self.add_binding(PublisherBinding(), PublisherRingExchange(ring)) self.add_binding(FanoutBinding(), FanoutRingExchange(ring)) self.add_binding(DirectBinding(), DirectExchange()) self.add_binding(TopicBinding(), RoundRobinRingExchange(ring)) @@ -266,7 +239,6 @@ class MatchMakerLocalhost(MatchMakerBase): """ def __init__(self): super(MatchMakerLocalhost, self).__init__() - self.add_binding(PublisherBinding(), LocalhostExchange()) self.add_binding(FanoutBinding(), LocalhostExchange()) self.add_binding(DirectBinding(), DirectExchange()) self.add_binding(TopicBinding(), LocalhostExchange()) @@ -281,7 +253,6 @@ class MatchMakerStub(MatchMakerBase): def __init__(self): super(MatchMakerLocalhost, self).__init__() - self.add_binding(PublisherBinding(), StubExchange()) self.add_binding(FanoutBinding(), StubExchange()) self.add_binding(DirectBinding(), StubExchange()) self.add_binding(TopicBinding(), StubExchange()) diff --git a/ceilometer/openstack/common/rpc/service.py b/ceilometer/openstack/common/rpc/service.py index c93481540..ab7d630fd 100644 --- a/ceilometer/openstack/common/rpc/service.py +++ b/ceilometer/openstack/common/rpc/service.py @@ -20,6 +20,7 @@ from ceilometer.openstack.common.gettextutils import _ from ceilometer.openstack.common import log as logging from ceilometer.openstack.common import rpc +from ceilometer.openstack.common.rpc import dispatcher as rpc_dispatcher from ceilometer.openstack.common import service @@ -46,15 +47,15 @@ class Service(service.Service): LOG.debug(_("Creating Consumer connection for Service %s") % self.topic) - rpc_dispatcher = rpc.dispatcher.RpcDispatcher([self.manager]) + dispatcher = rpc_dispatcher.RpcDispatcher([self.manager]) # Share this same connection for these Consumers - self.conn.create_consumer(self.topic, rpc_dispatcher, fanout=False) + self.conn.create_consumer(self.topic, dispatcher, fanout=False) node_topic = '%s.%s' % (self.topic, self.host) - self.conn.create_consumer(node_topic, rpc_dispatcher, fanout=False) + self.conn.create_consumer(node_topic, dispatcher, fanout=False) - self.conn.create_consumer(self.topic, rpc_dispatcher, fanout=True) + self.conn.create_consumer(self.topic, dispatcher, fanout=True) # Consume from all consumers in a thread self.conn.consume_in_thread() diff --git a/ceilometer/openstack/common/setup.py b/ceilometer/openstack/common/setup.py index 317e82d87..83eef07a7 100644 --- a/ceilometer/openstack/common/setup.py +++ b/ceilometer/openstack/common/setup.py @@ -31,13 +31,13 @@ from setuptools.command import sdist def parse_mailmap(mailmap='.mailmap'): mapping = {} if os.path.exists(mailmap): - fp = open(mailmap, 'r') - for l in fp: - l = l.strip() - if not l.startswith('#') and ' ' in l: - canonical_email, alias = [x for x in l.split(' ') - if x.startswith('<')] - mapping[alias] = canonical_email + with open(mailmap, 'r') as fp: + for l in fp: + l = l.strip() + if not l.startswith('#') and ' ' in l: + canonical_email, alias = [x for x in l.split(' ') + if x.startswith('<')] + mapping[alias] = canonical_email return mapping @@ -54,7 +54,8 @@ def canonicalize_emails(changelog, mapping): def get_reqs_from_files(requirements_files): for requirements_file in requirements_files: if os.path.exists(requirements_file): - return open(requirements_file, 'r').read().split('\n') + with open(requirements_file, 'r') as fil: + return fil.read().split('\n') return [] @@ -135,15 +136,17 @@ def _get_git_next_version_suffix(branch_name): _run_shell_command("git fetch origin +refs/meta/*:refs/remotes/meta/*") milestone_cmd = "git show meta/openstack/release:%s" % branch_name milestonever = _run_shell_command(milestone_cmd) - if not milestonever: - milestonever = "" + if milestonever: + first_half = "%s~%s" % (milestonever, datestamp) + else: + first_half = datestamp + post_version = _get_git_post_version() # post version should look like: # 0.1.1.4.gcc9e28a # where the bit after the last . is the short sha, and the bit between # the last and second to last is the revno count (revno, sha) = post_version.split(".")[-2:] - first_half = "%s~%s" % (milestonever, datestamp) second_half = "%s%s.%s" % (revno_prefix, revno, sha) return ".".join((first_half, second_half)) @@ -236,7 +239,8 @@ def read_versioninfo(project): def write_versioninfo(project, version): """Write a simple file containing the version of the package.""" - open(os.path.join(project, 'versioninfo'), 'w').write("%s\n" % version) + with open(os.path.join(project, 'versioninfo'), 'w') as fil: + fil.write("%s\n" % version) def get_cmdclass(): diff --git a/ceilometer/openstack/common/timeutils.py b/ceilometer/openstack/common/timeutils.py index 93b34fc5b..86004391d 100644 --- a/ceilometer/openstack/common/timeutils.py +++ b/ceilometer/openstack/common/timeutils.py @@ -74,6 +74,11 @@ def is_older_than(before, seconds): return utcnow() - before > datetime.timedelta(seconds=seconds) +def is_newer_than(after, seconds): + """Return True if after is newer than seconds.""" + return after - utcnow() > datetime.timedelta(seconds=seconds) + + def utcnow_ts(): """Timestamp version of our utcnow function.""" return calendar.timegm(utcnow().timetuple())