Update latest OSLO.

Changes include:
1. Removing the 'extras' in the pip-requires
2. Fixes for fake implementations for RPC calls
3. Version updates due to common version update

Change-Id: Iefd32b3f7d529943b078e6d927d06043286ff94e
This commit is contained in:
Gary Kotton 2013-01-15 13:00:03 +00:00
parent 98b05bc5d8
commit 0748f92e44
10 changed files with 125 additions and 45 deletions

View File

@ -1735,11 +1735,13 @@ class CommonConfigOpts(ConfigOpts):
BoolOpt('debug',
short='d',
default=False,
help='Print debugging output'),
help='Print debugging output (set logging level to '
'DEBUG instead of default WARNING level).'),
BoolOpt('verbose',
short='v',
default=False,
help='Print more verbose output'),
help='Print more verbose output (set logging level to '
'INFO instead of default WARNING level).'),
]
logging_cli_opts = [

View File

@ -57,3 +57,11 @@ def import_module(import_str):
"""Import a module."""
__import__(import_str)
return sys.modules[import_str]
def try_import(import_str, default=None):
"""Try to import a module and if it fails return default."""
try:
return import_module(import_str)
except ImportError:
return default

View File

@ -220,6 +220,11 @@ def synchronized(name, lock_file_prefix, external=False, lock_path=None):
'method': f.__name__})
retval = f(*args, **kwargs)
finally:
LOG.debug(_('Released file lock "%(lock)s" at %(path)s'
' for method "%(method)s"...'),
{'lock': name,
'path': lock_file_path,
'method': f.__name__})
# NOTE(vish): This removes the tempdir if we needed
# to create one. This is used to cleanup
# the locks left behind by unit tests.

View File

@ -49,19 +49,19 @@ from quantum.openstack.common import notifier
log_opts = [
cfg.StrOpt('logging_context_format_string',
default='%(asctime)s.%(msecs)d %(levelname)s %(name)s '
default='%(asctime)s.%(msecs)03d %(levelname)s %(name)s '
'[%(request_id)s %(user)s %(tenant)s] %(instance)s'
'%(message)s',
help='format string to use for log messages with context'),
cfg.StrOpt('logging_default_format_string',
default='%(asctime)s.%(msecs)d %(process)d %(levelname)s '
default='%(asctime)s.%(msecs)03d %(process)d %(levelname)s '
'%(name)s [-] %(instance)s%(message)s',
help='format string to use for log messages without context'),
cfg.StrOpt('logging_debug_format_suffix',
default='%(funcName)s %(pathname)s:%(lineno)d',
help='data to append to log format when level is DEBUG'),
cfg.StrOpt('logging_exception_prefix',
default='%(asctime)s.%(msecs)d %(process)d TRACE %(name)s '
default='%(asctime)s.%(msecs)03d %(process)d TRACE %(name)s '
'%(instance)s',
help='prefix each line of exception output with this format'),
cfg.ListOpt('default_log_levels',
@ -361,10 +361,12 @@ def _setup_logging_from_conf(product_name):
datefmt=datefmt))
handler.setFormatter(LegacyFormatter(datefmt=datefmt))
if CONF.verbose or CONF.debug:
if CONF.debug:
log_root.setLevel(logging.DEBUG)
else:
elif CONF.verbose:
log_root.setLevel(logging.INFO)
else:
log_root.setLevel(logging.WARNING)
level = logging.NOTSET
for pair in CONF.default_log_levels:

View File

@ -167,7 +167,7 @@ def cast(conf, context, topic, msg):
pass
def notify(conf, context, topic, msg):
def notify(conf, context, topic, msg, envelope):
check_serialize(msg)

View File

@ -15,6 +15,7 @@
# under the License.
import pprint
import os
import socket
import string
import sys
@ -29,6 +30,7 @@ from quantum.openstack.common import cfg
from quantum.openstack.common.gettextutils import _
from quantum.openstack.common import importutils
from quantum.openstack.common import jsonutils
from quantum.openstack.common import processutils as utils
from quantum.openstack.common.rpc import common as rpc_common
@ -61,6 +63,10 @@ zmq_opts = [
cfg.IntOpt('rpc_zmq_contexts', default=1,
help='Number of ZeroMQ contexts, defaults to 1'),
cfg.IntOpt('rpc_zmq_topic_backlog', default=None,
help='Maximum number of ingress messages to locally buffer '
'per topic. Default is unlimited.'),
cfg.StrOpt('rpc_zmq_ipc_dir', default='/var/run/openstack',
help='Directory for holding IPC sockets'),
@ -413,12 +419,6 @@ class ZmqProxy(ZmqBaseReactor):
super(ZmqProxy, self).__init__(conf)
self.topic_proxy = {}
ipc_dir = CONF.rpc_zmq_ipc_dir
self.topic_proxy['zmq_replies'] = \
ZmqSocket("ipc://%s/zmq_topic_zmq_replies" % (ipc_dir, ),
zmq.PUB, bind=True)
self.sockets.append(self.topic_proxy['zmq_replies'])
def consume(self, sock):
ipc_dir = CONF.rpc_zmq_ipc_dir
@ -444,20 +444,81 @@ class ZmqProxy(ZmqBaseReactor):
sock_type = zmq.PUSH
if not topic in self.topic_proxy:
outq = ZmqSocket("ipc://%s/zmq_topic_%s" % (ipc_dir, topic),
def publisher(waiter):
LOG.info(_("Creating proxy for topic: %s"), topic)
try:
out_sock = ZmqSocket("ipc://%s/zmq_topic_%s" %
(ipc_dir, topic),
sock_type, bind=True)
self.topic_proxy[topic] = outq
self.sockets.append(outq)
LOG.info(_("Created topic proxy: %s"), topic)
except RPCException:
waiter.send_exception(*sys.exc_info())
return
self.topic_proxy[topic] = eventlet.queue.LightQueue(
CONF.rpc_zmq_topic_backlog)
self.sockets.append(out_sock)
# It takes some time for a pub socket to open,
# before we can have any faith in doing a send() to it.
if sock_type == zmq.PUB:
eventlet.sleep(.5)
LOG.debug(_("ROUTER RELAY-OUT START %(data)s") % {'data': data})
self.topic_proxy[topic].send(data)
LOG.debug(_("ROUTER RELAY-OUT SUCCEEDED %(data)s") % {'data': data})
waiter.send(True)
while(True):
data = self.topic_proxy[topic].get()
out_sock.send(data)
LOG.debug(_("ROUTER RELAY-OUT SUCCEEDED %(data)s") %
{'data': data})
wait_sock_creation = eventlet.event.Event()
eventlet.spawn(publisher, wait_sock_creation)
try:
wait_sock_creation.wait()
except RPCException:
LOG.error(_("Topic socket file creation failed."))
return
try:
self.topic_proxy[topic].put_nowait(data)
LOG.debug(_("ROUTER RELAY-OUT QUEUED %(data)s") %
{'data': data})
except eventlet.queue.Full:
LOG.error(_("Local per-topic backlog buffer full for topic "
"%(topic)s. Dropping message.") % {'topic': topic})
def consume_in_thread(self):
"""Runs the ZmqProxy service"""
ipc_dir = CONF.rpc_zmq_ipc_dir
consume_in = "tcp://%s:%s" % \
(CONF.rpc_zmq_bind_address,
CONF.rpc_zmq_port)
consumption_proxy = InternalContext(None)
if not os.path.isdir(ipc_dir):
try:
utils.execute('mkdir', '-p', ipc_dir, run_as_root=True)
utils.execute('chown', "%s:%s" % (os.getuid(), os.getgid()),
ipc_dir, run_as_root=True)
utils.execute('chmod', '750', ipc_dir, run_as_root=True)
except utils.ProcessExecutionError:
LOG.error(_("Could not create IPC directory %s") %
(ipc_dir, ))
raise
try:
self.register(consumption_proxy,
consume_in,
zmq.PULL,
out_bind=True)
except zmq.ZMQError:
LOG.error(_("Could not create ZeroMQ receiver daemon. "
"Socket may already be in use."))
raise
super(ZmqProxy, self).consume_in_thread()
class ZmqReactor(ZmqBaseReactor):
@ -551,7 +612,8 @@ def _cast(addr, context, msg_id, topic, msg, timeout=None, serialize=True,
conn.close()
def _call(addr, context, msg_id, topic, msg, timeout=None):
def _call(addr, context, msg_id, topic, msg, timeout=None,
serialize=True, force_envelope=False):
# timeout_response is how long we wait for a response
timeout = timeout or CONF.rpc_response_timeout
@ -586,7 +648,8 @@ def _call(addr, context, msg_id, topic, msg, timeout=None):
)
LOG.debug(_("Sending cast"))
_cast(addr, context, msg_id, topic, payload)
_cast(addr, context, msg_id, topic, payload,
serialize=serialize, force_envelope=force_envelope)
LOG.debug(_("Cast sent; Waiting reply"))
# Blocks until receives reply
@ -642,7 +705,8 @@ def _multi_send(method, context, topic, msg, timeout=None, serialize=True,
_topic, _topic, msg, timeout, serialize,
force_envelope)
return
return method(_addr, context, _topic, _topic, msg, timeout)
return method(_addr, context, _topic, _topic, msg, timeout,
serialize, force_envelope)
def create_connection(conf, new=True):

View File

@ -27,17 +27,17 @@ import sys
import time
import eventlet
import extras
import logging as std_logging
from quantum.openstack.common import cfg
from quantum.openstack.common import eventlet_backdoor
from quantum.openstack.common.gettextutils import _
from quantum.openstack.common import importutils
from quantum.openstack.common import log as logging
from quantum.openstack.common import threadgroup
rpc = extras.try_import('quantum.openstack.common.rpc')
rpc = importutils.try_import('quantum.openstack.common.rpc')
CONF = cfg.CONF
LOG = logging.getLogger(__name__)

View File

@ -5,7 +5,6 @@ amqplib==0.6.1
anyjson>=0.2.4
argparse
eventlet>=0.9.17
extras
greenlet>=0.3.1
httplib2
iso8601>=0.1.4