Run pyupgrade to clean up Python 2 syntaxes

Update all .py source files by
 $ pyupgrade --py3-only $(git ls-files | grep ".py$")
to modernize the code according to Python 3 syntaxes.

pep8 errors are fixed by
 $ autopep8 --select=E127,E128,E501 --max-line-length 79 -r \
    --in-place oslo_messaging
and a few manual adjustments.

Also add the pyupgrade hook to pre-commit to avoid merging additional
Python 2 syntaxes.

Change-Id: I8115b7f8c5d27ce935e4422c351add4bb72e354f
This commit is contained in:
Takashi Kajinami 2024-10-19 23:25:48 +09:00
parent 7f52894bbe
commit 34fd61bdc2
68 changed files with 334 additions and 356 deletions

View File

@ -28,3 +28,8 @@ repos:
hooks:
- id: bandit
args: ['-x', 'tests,tools']
- repo: https://github.com/asottile/pyupgrade
rev: v3.18.0
hooks:
- id: pyupgrade
args: [--py3-only]

View File

@ -1,4 +1,3 @@
# -*- coding: utf-8 -*-
# Copyright (C) 2020 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may

View File

@ -1,4 +1,3 @@
# Copyright 2013 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may

View File

@ -49,7 +49,7 @@ class RpcContext(rpc_common.CommonRpcContext):
def __init__(self, **kwargs):
self.msg_id = kwargs.pop('msg_id', None)
self.reply_q = kwargs.pop('reply_q', None)
super(RpcContext, self).__init__(**kwargs)
super().__init__(**kwargs)
def deepcopy(self):
values = self.to_dict()
@ -91,7 +91,7 @@ def pack_context(msg, context):
for (key, value) in context_d)
class _MsgIdCache(object):
class _MsgIdCache:
"""This class checks any duplicate messages."""
# NOTE: This value is considered can be a configuration item, but

View File

@ -68,7 +68,7 @@ def keyify(address, service=SERVICE_RPC):
return "String:{%s}" % address
class Addresser(object):
class Addresser:
"""Base class message bus address generator. Used to convert an
oslo.messaging address into an AMQP 1.0 address string used over the
connection to the message bus.
@ -118,7 +118,7 @@ class LegacyAddresser(Addresser):
"""
def __init__(self, default_exchange, server_prefix, broadcast_prefix,
group_prefix, vhost):
super(LegacyAddresser, self).__init__(default_exchange)
super().__init__(default_exchange)
self._server_prefix = server_prefix
self._broadcast_prefix = broadcast_prefix
self._group_prefix = group_prefix
@ -181,7 +181,7 @@ class RoutableAddresser(Addresser):
def __init__(self, default_exchange, rpc_exchange, rpc_prefix,
notify_exchange, notify_prefix, unicast_tag, multicast_tag,
anycast_tag, vhost):
super(RoutableAddresser, self).__init__(default_exchange)
super().__init__(default_exchange)
if not self._default_exchange:
self._default_exchange = "openstack"
@ -260,7 +260,7 @@ class RoutableAddresser(Addresser):
else self._notify_prefix)
class AddresserFactory(object):
class AddresserFactory:
"""Generates the proper Addresser based on configuration and the type of
message bus the driver is connected to.
"""

View File

@ -52,7 +52,7 @@ from oslo_messaging import transport
LOG = logging.getLogger(__name__)
class Task(object):
class Task:
"""Run a command on the eventloop thread, wait until it completes
"""
@ -74,7 +74,7 @@ class SubscribeTask(Task):
arriving from the target are given to the listener.
"""
def __init__(self, target, listener, notifications=False):
super(SubscribeTask, self).__init__()
super().__init__()
self._target = target() # mutable - need a copy
self._subscriber_id = listener.id
self._in_queue = listener.incoming
@ -95,7 +95,7 @@ class SendTask(Task):
"""
def __init__(self, name, message, target, deadline, retry,
wait_for_ack, notification=False):
super(SendTask, self).__init__()
super().__init__()
self.name = name
# note: target can be either a Target class or a string
# target is mutable - make copy
@ -195,18 +195,18 @@ class RPCCallTask(SendTask):
the destination.
"""
def __init__(self, target, message, deadline, retry, wait_for_ack):
super(RPCCallTask, self).__init__("RPC Call", message, target,
deadline, retry, wait_for_ack)
super().__init__("RPC Call", message, target,
deadline, retry, wait_for_ack)
self._reply_link = None
self._reply_msg = None
self._msg_id = None
def wait(self):
error = super(RPCCallTask, self).wait()
error = super().wait()
return error or self._reply_msg
def _prepare(self, sender):
super(RPCCallTask, self)._prepare(sender)
super()._prepare(sender)
# reserve a message id for mapping the received response
if self._msg_id:
# already set so this is a re-transmit. To be safe cancel the old
@ -224,7 +224,7 @@ class RPCCallTask(SendTask):
def _on_ack(self, state, info):
if state != pyngus.SenderLink.ACCEPTED:
super(RPCCallTask, self)._on_ack(state, info)
super()._on_ack(state, info)
# must wait for reply if ACCEPTED
def _cleanup(self):
@ -232,7 +232,7 @@ class RPCCallTask(SendTask):
self._reply_link.cancel_response(self._msg_id)
self._msg_id = None
self._reply_link = None
super(RPCCallTask, self)._cleanup()
super()._cleanup()
class RPCMonitoredCallTask(RPCCallTask):
@ -243,8 +243,8 @@ class RPCMonitoredCallTask(RPCCallTask):
"""
def __init__(self, target, message, deadline, call_monitor_timeout,
retry, wait_for_ack):
super(RPCMonitoredCallTask, self).__init__(target, message, deadline,
retry, wait_for_ack)
super().__init__(target, message, deadline,
retry, wait_for_ack)
assert call_monitor_timeout is not None # nosec
self._monitor_timeout = call_monitor_timeout
self._monitor_timer = None
@ -254,7 +254,7 @@ class RPCMonitoredCallTask(RPCCallTask):
self._set_alarm = controller.processor.defer
self._monitor_timer = self._set_alarm(self._call_timeout,
self._monitor_timeout)
super(RPCMonitoredCallTask, self)._execute(controller)
super()._execute(controller)
def _call_timeout(self):
# monitor_timeout expired
@ -274,14 +274,14 @@ class RPCMonitoredCallTask(RPCCallTask):
self._monitor_timer = self._set_alarm(self._call_timeout,
self._monitor_timeout)
else:
super(RPCMonitoredCallTask, self)._on_reply(message)
super()._on_reply(message)
def _cleanup(self):
self._set_alarm = None
if self._monitor_timer:
self._monitor_timer.cancel()
self._monitor_timer = None
super(RPCMonitoredCallTask, self)._cleanup()
super()._cleanup()
class MessageDispositionTask(Task):
@ -289,7 +289,7 @@ class MessageDispositionTask(Task):
for a Server
"""
def __init__(self, disposition, released=False):
super(MessageDispositionTask, self).__init__()
super().__init__()
self._disposition = disposition
self._released = released
@ -311,7 +311,7 @@ class Sender(pyngus.SenderEventHandler):
"""A link for sending to a particular destination on the message bus.
"""
def __init__(self, destination, scheduler, delay, service):
super(Sender, self).__init__()
super().__init__()
self._destination = destination
self._service = service
self._address = None
@ -537,8 +537,8 @@ class Sender(pyngus.SenderEventHandler):
self._send(self._pending_sends.popleft())
def _open_link(self):
name = "openstack.org/om/sender/[%s]/%s" % (self._address,
uuid.uuid4().hex)
name = "openstack.org/om/sender/[{}]/{}".format(self._address,
uuid.uuid4().hex)
link = self._connection.create_sender(name=name,
source_address=self._address,
target_address=self._address,
@ -685,7 +685,8 @@ class Server(pyngus.ReceiverEventHandler):
"""
self._connection = connection
for a in self._addresses:
name = "openstack.org/om/receiver/[%s]/%s" % (a, uuid.uuid4().hex)
name = "openstack.org/om/receiver/[{}]/{}".format(
a, uuid.uuid4().hex)
r = self._open_link(a, name)
self._receivers.append(r)
@ -786,8 +787,8 @@ class Server(pyngus.ReceiverEventHandler):
class RPCServer(Server):
"""Subscribes to RPC addresses"""
def __init__(self, target, incoming, scheduler, delay, capacity):
super(RPCServer, self).__init__(target, incoming, scheduler, delay,
capacity)
super().__init__(target, incoming, scheduler, delay,
capacity)
def attach(self, connection, addresser):
# Generate the AMQP 1.0 addresses for the base class
@ -797,14 +798,14 @@ class RPCServer(Server):
addresser.anycast_address(self._target, SERVICE_RPC)
]
# now invoke the base class with the generated addresses
super(RPCServer, self).attach(connection)
super().attach(connection)
class NotificationServer(Server):
"""Subscribes to Notification addresses"""
def __init__(self, target, incoming, scheduler, delay, capacity):
super(NotificationServer, self).__init__(target, incoming, scheduler,
delay, capacity)
super().__init__(target, incoming, scheduler,
delay, capacity)
def attach(self, connection, addresser):
# Generate the AMQP 1.0 addresses for the base class
@ -812,10 +813,10 @@ class NotificationServer(Server):
addresser.anycast_address(self._target, SERVICE_NOTIFY)
]
# now invoke the base class with the generated addresses
super(NotificationServer, self).attach(connection)
super().attach(connection)
class Hosts(object):
class Hosts:
"""An order list of TransportHost addresses. Connection failover progresses
from one host to the next. The default realm comes from the configuration
and is only used if no realm is present in the URL.

View File

@ -44,7 +44,7 @@ def compute_timeout(offset):
return math.ceil(time.monotonic() + offset)
class _SocketConnection(object):
class _SocketConnection:
"""Associates a pyngus Connection with a python network socket,
and handles all connection-related I/O and timer events.
"""
@ -71,7 +71,7 @@ class _SocketConnection(object):
try:
pyngus.read_socket_input(self.pyngus_conn, self.socket)
self.pyngus_conn.process(time.monotonic())
except (socket.timeout, socket.error) as e:
except (socket.timeout, OSError) as e:
# pyngus handles EAGAIN/EWOULDBLOCK and EINTER
self.pyngus_conn.close_input()
self.pyngus_conn.close_output()
@ -83,7 +83,7 @@ class _SocketConnection(object):
try:
pyngus.write_socket_output(self.pyngus_conn, self.socket)
self.pyngus_conn.process(time.monotonic())
except (socket.timeout, socket.error) as e:
except (socket.timeout, OSError) as e:
# pyngus handles EAGAIN/EWOULDBLOCK and EINTER
self.pyngus_conn.close_output()
self.pyngus_conn.close_input()
@ -104,7 +104,7 @@ class _SocketConnection(object):
my_socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
try:
my_socket.connect(addr[0][4])
except socket.error as e:
except OSError as e:
if e.errno != errno.EINPROGRESS:
error = "Socket connect failure '%s'" % str(e)
LOG.error("Socket connect failure '%s'", str(e))
@ -159,10 +159,10 @@ class _SocketConnection(object):
self.socket = None
class Scheduler(object):
class Scheduler:
"""Schedule callables to be run in the future.
"""
class Event(object):
class Event:
# simply hold a reference to a callback that can be set to None if the
# alarm is canceled
def __init__(self, callback):
@ -229,7 +229,7 @@ class Scheduler(object):
pass
class Requests(object):
class Requests:
"""A queue of callables to execute from the eventloop thread's main
loop.
"""
@ -273,7 +273,7 @@ class Thread(threading.Thread):
threads.
"""
def __init__(self, container_name, node, command, pid):
super(Thread, self).__init__()
super().__init__()
# callables from other threads:
self._requests = Requests()
@ -325,7 +325,8 @@ class Thread(threading.Thread):
def connect(self, host, handler, properties):
"""Get a _SocketConnection to a peer represented by url."""
key = "openstack.org/om/connection/%s:%s/" % (host.hostname, host.port)
key = "openstack.org/om/connection/{}:{}/".format(
host.hostname, host.port)
# return pre-existing
conn = self._container.get_connection(key)
if conn:
@ -379,7 +380,7 @@ class Thread(threading.Thread):
# and now we wait...
try:
select.select(readfds, writefds, [], timeout)
except select.error as serror:
except OSError as serror:
if serror[0] == errno.EINTR:
LOG.warning("ignoring interrupt from select(): %s",
str(serror))

View File

@ -1,4 +1,3 @@
# Copyright 2013 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
@ -42,7 +41,7 @@ ACK_REQUEUE_EVERY_SECONDS_MIN = 0.001
ACK_REQUEUE_EVERY_SECONDS_MAX = 5.0
class QManager(object):
class QManager:
"""Queue Manager to build queue name for reply (and fanout) type.
This class is used only when use_queue_manager is set to True in config
file.
@ -61,8 +60,9 @@ class QManager(object):
self.hostname = hostname
self.processname = processname
# This is where the counter is kept
self.file_name = '/dev/shm/%s_%s_qmanager' % (self.hostname, # nosec
self.processname)
self.file_name = '/dev/shm/{}_{}_qmanager'.format( # nosec
self.hostname,
self.processname)
# We use the process group to restart the counter on service restart
self.pg = os.getpgrp()
@ -70,18 +70,19 @@ class QManager(object):
# parse start time (in jiffies) since system boot
#
# https://www.man7.org/linux/man-pages//man5/proc_pid_stat.5.html
with open(f'/proc/{self.pg}/stat', 'r') as f:
with open(f'/proc/{self.pg}/stat') as f:
self.start_time = int(f.read().split()[21])
def get(self):
lock_name = 'oslo_read_shm_%s_%s' % (self.hostname, self.processname)
lock_name = 'oslo_read_shm_{}_{}'.format(
self.hostname, self.processname)
@lockutils.synchronized(lock_name, external=True)
def read_from_shm():
# Grab the counter from shm
# This function is thread and process safe thanks to lockutils
try:
with open(self.file_name, 'r') as f:
with open(self.file_name) as f:
pg, counter, start_time = f.readline().split(':')
pg = int(pg)
counter = int(counter)
@ -110,14 +111,14 @@ class QManager(object):
return self.hostname + ":" + self.processname + ":" + str(counter)
class MessageOperationsHandler(object):
class MessageOperationsHandler:
"""Queue used by message operations to ensure that all tasks are
serialized and run in the same thread, since underlying drivers like kombu
are not thread safe.
"""
def __init__(self, name):
self.name = "%s (%s)" % (name, hex(id(self)))
self.name = "{} ({})".format(name, hex(id(self)))
self._tasks = queue.Queue()
self._shutdown = eventletutils.Event()
@ -159,7 +160,7 @@ class AMQPIncomingMessage(base.RpcIncomingMessage):
def __init__(self, listener, ctxt, message, unique_id, msg_id, reply_q,
client_timeout, obsolete_reply_queues,
message_operations_handler):
super(AMQPIncomingMessage, self).__init__(ctxt, message, msg_id)
super().__init__(ctxt, message, msg_id)
self.orig_msg_id = msg_id
self.listener = listener
@ -317,7 +318,7 @@ class NotificationAMQPIncomingMessage(AMQPIncomingMessage):
self._message_operations_handler.do(_do_requeue)
class ObsoleteReplyQueuesCache(object):
class ObsoleteReplyQueuesCache:
"""Cache of reply queue id that doesn't exist anymore.
NOTE(sileht): In case of a broker restart/failover
@ -365,7 +366,7 @@ class AMQPListener(base.PollStyleListener):
use_cache = False
def __init__(self, driver, conn):
super(AMQPListener, self).__init__(driver.prefetch_size)
super().__init__(driver.prefetch_size)
self.driver = driver
self.conn = conn
self.msg_id_cache = rpc_amqp._MsgIdCache()
@ -493,14 +494,14 @@ class RpcAMQPListener(AMQPListener):
# succeeds there is no guarantee the broker actually gets the ACK
# since acknowledge() simply writes the ACK to the socket (there is
# no ACK confirmation coming back from the broker)
super(RpcAMQPListener, self).__call__(message)
super().__call__(message)
class NotificationAMQPListener(AMQPListener):
message_cls = NotificationAMQPIncomingMessage
class ReplyWaiters(object):
class ReplyWaiters:
def __init__(self):
self._queues = {}
@ -547,7 +548,7 @@ class ReplyWaiters(object):
del self._queues[msg_id]
class ReplyWaiter(object):
class ReplyWaiter:
def __init__(self, reply_q, conn, allowed_remote_exmods):
self.conn = conn
self.allowed_remote_exmods = allowed_remote_exmods
@ -675,8 +676,8 @@ class AMQPDriverBase(base.BaseDriver):
def __init__(self, conf, url, connection_pool,
default_exchange=None, allowed_remote_exmods=None):
super(AMQPDriverBase, self).__init__(conf, url, default_exchange,
allowed_remote_exmods)
super().__init__(conf, url, default_exchange,
allowed_remote_exmods)
self._default_exchange = default_exchange
@ -768,15 +769,15 @@ class AMQPDriverBase(base.BaseDriver):
'topic': target.topic})
conn.notify_send(exchange, target.topic, msg, retry=retry)
elif target.fanout:
log_msg += "FANOUT topic '%(topic)s'" % {
'topic': target.topic}
log_msg += "FANOUT topic '{topic}'".format(
topic=target.topic)
LOG.debug(log_msg)
conn.fanout_send(target.topic, msg, retry=retry)
else:
topic = target.topic
exchange = self._get_exchange(target)
if target.server:
topic = '%s.%s' % (target.topic, target.server)
topic = '{}.{}'.format(target.topic, target.server)
LOG.debug(log_msg + "exchange '%(exchange)s'"
" topic '%(topic)s'", {'exchange': exchange,
'topic': topic})
@ -813,8 +814,8 @@ class AMQPDriverBase(base.BaseDriver):
topic=target.topic,
callback=listener)
conn.declare_topic_consumer(exchange_name=self._get_exchange(target),
topic='%s.%s' % (target.topic,
target.server),
topic='{}.{}'.format(target.topic,
target.server),
callback=listener)
conn.declare_fanout_consumer(target.topic, listener)
@ -829,7 +830,7 @@ class AMQPDriverBase(base.BaseDriver):
for target, priority in targets_and_priorities:
conn.declare_topic_consumer(
exchange_name=self._get_exchange(target),
topic='%s.%s' % (target.topic, priority),
topic='{}.{}'.format(target.topic, priority),
callback=listener, queue_name=pool)
return base.PollStyleListenerAdapter(listener, batch_size,
batch_timeout)

View File

@ -66,7 +66,7 @@ class TransportDriverError(exceptions.MessagingException):
"""Base class for transport driver specific exceptions."""
class IncomingMessage(object, metaclass=abc.ABCMeta):
class IncomingMessage(metaclass=abc.ABCMeta):
"""The IncomingMessage class represents a single message received from the
messaging backend. Instances of this class are passed to up a server's
messaging processing logic. The backend driver must provide a concrete
@ -173,7 +173,7 @@ class RpcIncomingMessage(IncomingMessage, metaclass=abc.ABCMeta):
"""
class PollStyleListener(object, metaclass=abc.ABCMeta):
class PollStyleListener(metaclass=abc.ABCMeta):
"""A PollStyleListener is used to transfer received messages to a server
for processing. A polling pattern is used to retrieve messages. A
PollStyleListener uses a separate thread to run the polling loop. A
@ -229,7 +229,7 @@ class PollStyleListener(object, metaclass=abc.ABCMeta):
pass
class Listener(object, metaclass=abc.ABCMeta):
class Listener(metaclass=abc.ABCMeta):
"""A Listener is used to transfer incoming messages from the driver to a
server for processing. A callback is used by the driver to transfer the
messages.
@ -287,7 +287,7 @@ class PollStyleListenerAdapter(Listener):
"""
def __init__(self, poll_style_listener, batch_size, batch_timeout):
super(PollStyleListenerAdapter, self).__init__(
super().__init__(
batch_size, batch_timeout, poll_style_listener.prefetch_size
)
self._poll_style_listener = poll_style_listener
@ -296,7 +296,7 @@ class PollStyleListenerAdapter(Listener):
self._started = False
def start(self, on_incoming_callback):
super(PollStyleListenerAdapter, self).start(on_incoming_callback)
super().start(on_incoming_callback)
self._started = True
self._listen_thread.start()
@ -323,13 +323,13 @@ class PollStyleListenerAdapter(Listener):
self._started = False
self._poll_style_listener.stop()
self._listen_thread.join()
super(PollStyleListenerAdapter, self).stop()
super().stop()
def cleanup(self):
self._poll_style_listener.cleanup()
class BaseDriver(object, metaclass=abc.ABCMeta):
class BaseDriver(metaclass=abc.ABCMeta):
"""Defines the backend driver interface. Each backend driver implementation
must provide a concrete derivation of this class implementing the backend
specific logic for its public methods.

View File

@ -91,7 +91,7 @@ class RPCException(Exception):
# at least get the core message out if something happened
message = self.msg_fmt
super(RPCException, self).__init__(message)
super().__init__(message)
class Timeout(RPCException):
@ -115,7 +115,7 @@ class Timeout(RPCException):
self.info = info
self.topic = topic
self.method = method
super(Timeout, self).__init__(
super().__init__(
None,
info=info or '<unknown>',
topic=topic or '<unknown>',
@ -144,7 +144,7 @@ class RpcVersionCapError(RPCException):
msg_fmt = "Specified RPC version cap, %(version_cap)s, is too low"
class Connection(object):
class Connection:
"""A connection, returned by rpc.create_connection().
This class represents a connection to the message bus used for rpc.
@ -235,7 +235,7 @@ def deserialize_remote_exception(data, allowed_remote_exmods):
str_override = lambda self: message
new_ex_type = type(ex_type.__name__ + _REMOTE_POSTFIX, (ex_type,),
{'__str__': str_override, '__unicode__': str_override})
new_ex_type.__module__ = '%s%s' % (module, _REMOTE_POSTFIX)
new_ex_type.__module__ = '{}{}'.format(module, _REMOTE_POSTFIX)
try:
# NOTE(ameade): Dynamically create a new exception type and swap it in
# as the new type for the exception. This only works on user defined
@ -250,7 +250,7 @@ def deserialize_remote_exception(data, allowed_remote_exmods):
return failure
class CommonRpcContext(object):
class CommonRpcContext:
def __init__(self, **kwargs):
self.values = kwargs
@ -339,7 +339,7 @@ def deserialize_msg(msg):
return raw_msg
class DecayingTimer(object):
class DecayingTimer:
def __init__(self, duration=None):
self._watch = timeutils.StopWatch(duration=duration)

View File

@ -110,7 +110,7 @@ def unmarshal_request(message):
class ProtonIncomingMessage(base.RpcIncomingMessage):
def __init__(self, listener, message, disposition):
request, ctxt, client_timeout = unmarshal_request(message)
super(ProtonIncomingMessage, self).__init__(ctxt, request)
super().__init__(ctxt, request)
self.listener = listener
self.client_timeout = client_timeout
self._reply_to = message.reply_to
@ -170,7 +170,7 @@ class ProtonIncomingMessage(base.RpcIncomingMessage):
@removals.removed_class("Queue")
class Queue(object):
class Queue:
def __init__(self):
self._queue = collections.deque()
self._lock = threading.Lock()
@ -202,7 +202,7 @@ class Queue(object):
@removals.removed_class("ProtonListener")
class ProtonListener(base.PollStyleListener):
def __init__(self, driver):
super(ProtonListener, self).__init__(driver.prefetch_size)
super().__init__(driver.prefetch_size)
self.driver = driver
self.incoming = Queue()
self.id = uuid.uuid4().hex
@ -232,8 +232,8 @@ class ProtonDriver(base.BaseDriver):
if proton is None or controller is None:
raise NotImplementedError("Proton AMQP C libraries not installed")
super(ProtonDriver, self).__init__(conf, url, default_exchange,
allowed_remote_exmods)
super().__init__(conf, url, default_exchange,
allowed_remote_exmods)
opt_group = cfg.OptGroup(name='oslo_messaging_amqp',
title='AMQP 1.0 driver options')
@ -429,7 +429,7 @@ class ProtonDriver(base.BaseDriver):
# this is how the destination target is created by the notifier,
# see MessagingDriver.notify in oslo_messaging/notify/messaging.py
for target, priority in targets_and_priorities:
topic = '%s.%s' % (target.topic, priority)
topic = '{}.{}'.format(target.topic, priority)
# Sooo... the exchange is simply discarded? (see above comment)
task = controller.SubscribeTask(Target(topic=topic),
listener, notifications=True)

View File

@ -27,7 +27,7 @@ from oslo_messaging._drivers import base
class FakeIncomingMessage(base.RpcIncomingMessage):
def __init__(self, ctxt, message, reply_q, requeue):
super(FakeIncomingMessage, self).__init__(ctxt, message)
super().__init__(ctxt, message)
self.requeue_callback = requeue
self._reply_q = reply_q
@ -46,7 +46,7 @@ class FakeIncomingMessage(base.RpcIncomingMessage):
class FakeListener(base.PollStyleListener):
def __init__(self, exchange_manager, targets, pool=None):
super(FakeListener, self).__init__()
super().__init__()
self._exchange_manager = exchange_manager
self._targets = targets
self._pool = pool
@ -87,7 +87,7 @@ class FakeListener(base.PollStyleListener):
self._stopped.set()
class FakeExchange(object):
class FakeExchange:
def __init__(self, name):
self.name = name
@ -145,7 +145,7 @@ class FakeExchange(object):
return queue.pop(0) if queue else (None, None, None, None)
class FakeExchangeManager(object):
class FakeExchangeManager:
_exchanges_lock = threading.Lock()
_exchanges = {}
@ -173,8 +173,8 @@ class FakeDriver(base.BaseDriver):
def __init__(self, conf, url, default_exchange=None,
allowed_remote_exmods=None):
super(FakeDriver, self).__init__(conf, url, default_exchange,
allowed_remote_exmods)
super().__init__(conf, url, default_exchange,
allowed_remote_exmods)
self._exchange_manager = FakeExchangeManager(default_exchange)
@ -248,7 +248,7 @@ class FakeDriver(base.BaseDriver):
batch_size, batch_timeout):
targets = [
oslo_messaging.Target(
topic='%s.%s' % (target.topic, priority),
topic='{}.{}'.format(target.topic, priority),
exchange=target.exchange)
for target, priority in targets_and_priorities]
listener = FakeListener(self._exchange_manager, targets, pool)

View File

@ -77,12 +77,12 @@ class ConsumerTimeout(KafkaException):
pass
class AssignedPartition(object):
class AssignedPartition:
"""This class is used by the ConsumerConnection to track the
assigned partitions.
"""
def __init__(self, topic, partition):
super(AssignedPartition, self).__init__()
super().__init__()
self.topic = topic
self.partition = partition
self.skey = '%s %d' % (self.topic, self.partition)
@ -91,7 +91,7 @@ class AssignedPartition(object):
return {'topic': self.topic, 'partition': self.partition}
class Connection(object):
class Connection:
"""This is the base class for consumer and producer connections for
transport attributes.
"""
@ -126,8 +126,8 @@ class Connection(object):
LOG.warning("Different transport usernames detected")
if host.hostname:
hostaddr = "%s:%s" % (netutils.escape_ipv6(host.hostname),
host.port)
hostaddr = "{}:{}".format(netutils.escape_ipv6(host.hostname),
host.port)
self.hostaddrs.append(hostaddr)
@ -141,7 +141,7 @@ class ConsumerConnection(Connection):
"""
def __init__(self, conf, url):
super(ConsumerConnection, self).__init__(conf, url)
super().__init__(conf, url)
self.consumer = None
self.consumer_timeout = self.driver_conf.kafka_consumer_timeout
self.max_fetch_bytes = self.driver_conf.kafka_max_fetch_bytes
@ -262,7 +262,7 @@ class ProducerConnection(Connection):
def __init__(self, conf, url):
super(ProducerConnection, self).__init__(conf, url)
super().__init__(conf, url)
self.batch_size = self.driver_conf.producer_batch_size
self.linger_ms = self.driver_conf.producer_batch_timeout * 1000
self.compression_codec = self.driver_conf.compression_codec
@ -356,7 +356,7 @@ class ProducerConnection(Connection):
class OsloKafkaMessage(base.RpcIncomingMessage):
def __init__(self, ctxt, message):
super(OsloKafkaMessage, self).__init__(ctxt, message)
super().__init__(ctxt, message)
def requeue(self):
LOG.warning("requeue is not supported")
@ -371,7 +371,7 @@ class OsloKafkaMessage(base.RpcIncomingMessage):
class KafkaListener(base.PollStyleListener):
def __init__(self, conn):
super(KafkaListener, self).__init__()
super().__init__()
self._stopped = eventletutils.Event()
self.conn = conn
self.incoming_queue = []
@ -411,7 +411,7 @@ class KafkaDriver(base.BaseDriver):
def __init__(self, conf, url, default_exchange=None,
allowed_remote_exmods=None):
conf = kafka_options.register_opts(conf, url)
super(KafkaDriver, self).__init__(
super().__init__(
conf, url, default_exchange, allowed_remote_exmods)
self.listeners = []

View File

@ -367,7 +367,7 @@ def _get_queue_arguments(rabbit_ha_queues, rabbit_queue_ttl,
class RabbitMessage(dict):
def __init__(self, raw_message):
super(RabbitMessage, self).__init__(
super().__init__(
rpc_common.deserialize_msg(raw_message.payload))
LOG.trace('RabbitMessage.Init: message %s', self)
self._raw_message = raw_message
@ -381,7 +381,7 @@ class RabbitMessage(dict):
self._raw_message.requeue()
class Consumer(object):
class Consumer:
"""Consumer class."""
def __init__(self, exchange_name, queue_name, routing_key, type, durable,
@ -724,7 +724,7 @@ class ConnectionLock(DummyConnectionLock):
self.release()
class Connection(object):
class Connection:
"""Connection object."""
def __init__(self, conf, url, purpose, retry=None):
@ -1006,7 +1006,7 @@ class Connection(object):
default_password='', default_hostname=''):
transport = url.transport.replace('kombu+', '')
transport = transport.replace('rabbit', 'amqp')
return '%s://%s:%s@%s:%s/%s' % (
return '{}://{}:{}@{}:{}/{}'.format(
transport,
parse.quote(host.username or default_username),
parse.quote(host.password or default_password),
@ -1309,7 +1309,7 @@ class Connection(object):
sock.setsockopt(socket.IPPROTO_TCP,
TCP_USER_TIMEOUT,
int(math.ceil(timeout)))
except socket.error as error:
except OSError as error:
code = error[0]
# TCP_USER_TIMEOUT not defined on kernels <2.6.37
if code != errno.ENOPROTOOPT:
@ -1527,7 +1527,7 @@ class Connection(object):
unique = self._q_manager.get()
else:
unique = uuid.uuid4().hex
queue_name = '%s_fanout_%s' % (topic, unique)
queue_name = '{}_fanout_{}'.format(topic, unique)
LOG.debug('Creating fanout queue: %s', queue_name)
is_durable = (self.rabbit_transient_quorum_queue or
@ -1573,8 +1573,8 @@ class Connection(object):
# the connection's socket while it is in an error state will cause
# py-amqp to attempt reconnecting.
ci = self.connection.info()
info = dict([(k, ci.get(k)) for k in
['hostname', 'port', 'transport']])
info = {k: ci.get(k) for k in
['hostname', 'port', 'transport']}
client_port = None
if (not conn_error and self.channel and
hasattr(self.channel.connection, 'sock') and
@ -1788,7 +1788,7 @@ class RabbitDriver(amqpdriver.AMQPDriverBase):
conf, max_size, min_size, ttl,
url, Connection)
super(RabbitDriver, self).__init__(
super().__init__(
conf, url,
connection_pool,
default_exchange,

View File

@ -24,7 +24,7 @@ from oslo_messaging._drivers import common
LOG = logging.getLogger(__name__)
class Pool(object, metaclass=abc.ABCMeta):
class Pool(metaclass=abc.ABCMeta):
"""A thread-safe object pool.
Modelled after the eventlet.pools.Pool interface, but designed to be safe
@ -35,7 +35,7 @@ class Pool(object, metaclass=abc.ABCMeta):
"""
def __init__(self, max_size=4, min_size=2, ttl=1200, on_expire=None):
super(Pool, self).__init__()
super().__init__()
self._min_size = min_size
self._max_size = max_size
self._item_ttl = ttl
@ -122,8 +122,8 @@ class ConnectionPool(Pool):
self.connection_cls = connection_cls
self.conf = conf
self.url = url
super(ConnectionPool, self).__init__(max_size, min_size, ttl,
self._on_expire)
super().__init__(max_size, min_size, ttl,
self._on_expire)
def _on_expire(self, connection):
connection.close()

View File

@ -1,4 +1,3 @@
# Copyright 2020 LINE Corp.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
@ -65,7 +64,7 @@ oslo_messaging_metrics = [
cfg.CONF.register_opts(oslo_messaging_metrics, group='oslo_messaging_metrics')
class MetricsCollectorClient(object):
class MetricsCollectorClient:
def __init__(self, conf, metrics_type, **kwargs):
self.conf = conf.oslo_messaging_metrics

View File

@ -1,4 +1,3 @@
# Copyright 2013 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
@ -70,7 +69,7 @@ def version_is_compatible(imp_version, version):
return True
class DummyLock(object):
class DummyLock:
def acquire(self):
pass

View File

@ -1,4 +1,3 @@
# Copyright 2013 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
@ -120,7 +119,7 @@ class ConfFixture(fixtures.Fixture):
self.conf.clear_override = self.conf.clear_override.wrapped
def setUp(self):
super(ConfFixture, self).setUp()
super().setUp()
self._setup_decorator()
self.addCleanup(self._teardown_decorator)
self.addCleanup(self.conf.reset)

View File

@ -1,4 +1,3 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
@ -19,7 +18,7 @@ __all__ = [
]
class DispatcherBase(object, metaclass=abc.ABCMeta):
class DispatcherBase(metaclass=abc.ABCMeta):
"Base class for dispatcher"
@abc.abstractmethod

View File

@ -1,4 +1,3 @@
# Copyright 2013 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
@ -34,7 +33,7 @@ class InvalidTarget(MessagingException, ValueError):
def __init__(self, msg, target):
msg = msg + ":" + str(target)
super(InvalidTarget, self).__init__(msg)
super().__init__(msg)
self.target = target
@ -42,7 +41,7 @@ class MessageUndeliverable(Exception):
"""Raised if message is not routed with mandatory flag"""
def __init__(self, exception, exchange, routing_key, message):
super(MessageUndeliverable, self).__init__()
super().__init__()
self.exception = exception
self.exchange = exchange
self.routing_key = routing_key

View File

@ -91,7 +91,7 @@ class CheckForLoggingIssues(BaseASTChecker):
version = '1.0'
def __init__(self, tree, filename):
super(CheckForLoggingIssues, self).__init__(tree, filename)
super().__init__(tree, filename)
self.logger_names = []
self.logger_module_names = []
@ -120,13 +120,13 @@ class CheckForLoggingIssues(BaseASTChecker):
def visit_Import(self, node):
for alias in node.names:
self._filter_imports(alias.name, alias)
return super(CheckForLoggingIssues, self).generic_visit(node)
return super().generic_visit(node)
def visit_ImportFrom(self, node):
for alias in node.names:
full_name = '%s.%s' % (node.module, alias.name)
full_name = '{}.{}'.format(node.module, alias.name)
self._filter_imports(full_name, alias)
return super(CheckForLoggingIssues, self).generic_visit(node)
return super().generic_visit(node)
def _find_name(self, node):
"""Return the fully qualified name or a Name or Attribute."""
@ -155,7 +155,7 @@ class CheckForLoggingIssues(BaseASTChecker):
if (len(node.targets) != 1 or
not isinstance(node.targets[0], attr_node_types)):
# say no to: "x, y = ..."
return super(CheckForLoggingIssues, self).generic_visit(node)
return super().generic_visit(node)
target_name = self._find_name(node.targets[0])
@ -170,17 +170,17 @@ class CheckForLoggingIssues(BaseASTChecker):
if not isinstance(node.value, ast.Call):
# node.value must be a call to getLogger
self.assignments.pop(target_name, None)
return super(CheckForLoggingIssues, self).generic_visit(node)
return super().generic_visit(node)
if isinstance(node.value.func, ast.Name):
self.assignments[target_name] = node.value.func.id
return super(CheckForLoggingIssues, self).generic_visit(node)
return super().generic_visit(node)
if (not isinstance(node.value.func, ast.Attribute) or
not isinstance(node.value.func.value, attr_node_types)):
# function must be an attribute on an object like
# logging.getLogger
return super(CheckForLoggingIssues, self).generic_visit(node)
return super().generic_visit(node)
object_name = self._find_name(node.value.func.value)
func_name = node.value.func.attr
@ -189,7 +189,7 @@ class CheckForLoggingIssues(BaseASTChecker):
func_name == 'getLogger'):
self.logger_names.append(target_name)
return super(CheckForLoggingIssues, self).generic_visit(node)
return super().generic_visit(node)
def visit_Call(self, node):
"""Look for the 'LOG.*' calls."""
@ -202,7 +202,7 @@ class CheckForLoggingIssues(BaseASTChecker):
obj_name = self._find_name(node.func.value)
method_name = node.func.attr
else: # could be Subscript, Call or many more
return super(CheckForLoggingIssues, self).generic_visit(node)
return super().generic_visit(node)
# if dealing with a logger the method can't be "warn"
if obj_name in self.logger_names and method_name == 'warn':
@ -211,16 +211,16 @@ class CheckForLoggingIssues(BaseASTChecker):
# must be a logger instance and one of the support logging methods
if obj_name not in self.logger_names:
return super(CheckForLoggingIssues, self).generic_visit(node)
return super().generic_visit(node)
# the call must have arguments
if not node.args:
return super(CheckForLoggingIssues, self).generic_visit(node)
return super().generic_visit(node)
if method_name == 'debug':
self._process_debug(node)
return super(CheckForLoggingIssues, self).generic_visit(node)
return super().generic_visit(node)
def _process_debug(self, node):
msg = node.args[0] # first arg to a logging method is the msg

View File

@ -1,4 +1,3 @@
# Copyright 2013 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may

View File

@ -1,4 +1,3 @@
# Copyright 2011 OpenStack Foundation.
# All Rights Reserved.
# Copyright 2013 Red Hat, Inc.
@ -36,8 +35,8 @@ class LogDriver(notifier.Driver):
LOGGER_BASE = 'oslo.messaging.notification'
def notify(self, ctxt, message, priority, retry):
logger = logging.getLogger('%s.%s' % (self.LOGGER_BASE,
message['event_type']))
logger = logging.getLogger('{}.{}'.format(self.LOGGER_BASE,
message['event_type']))
method = getattr(logger, priority.lower(), None)
if method:
method(jsonutils.dumps(strutils.mask_dict_password(message)))

View File

@ -1,4 +1,3 @@
# Copyright 2011 OpenStack Foundation.
# All Rights Reserved.
# Copyright 2013 Red Hat, Inc.

View File

@ -50,7 +50,7 @@ class RoutingDriver(notifier.Driver):
def _get_notifier_config_file(self, filename):
"""Broken out for testing."""
return open(filename, 'r')
return open(filename)
def _load_notifiers(self):
"""One-time load of notifier config file."""

View File

@ -1,4 +1,3 @@
# Copyright 2011 OpenStack Foundation.
# All Rights Reserved.
# Copyright 2013 Red Hat, Inc.

View File

@ -27,7 +27,7 @@ LOG = logging.getLogger(__name__)
PRIORITIES = ['audit', 'debug', 'info', 'warn', 'error', 'critical', 'sample']
class NotificationResult(object):
class NotificationResult:
HANDLED = 'handled'
REQUEUE = 'requeue'

View File

@ -16,7 +16,7 @@
import re
class NotificationFilter(object):
class NotificationFilter:
r"""Filter notification messages
@ -55,7 +55,7 @@ class NotificationFilter(object):
def _build_regex_dict(regex_list):
if regex_list is None:
return {}
return dict((k, re.compile(regex_list[k])) for k in regex_list)
return {k: re.compile(regex_list[k]) for k in regex_list}
@staticmethod
def _check_for_single_mismatch(data, regex):

View File

@ -147,8 +147,8 @@ class NotificationServerBase(msg_server.MessageHandlingServer):
def __init__(self, transport, targets, dispatcher, executor=None,
allow_requeue=True, pool=None, batch_size=1,
batch_timeout=None):
super(NotificationServerBase, self).__init__(transport, dispatcher,
executor)
super().__init__(transport, dispatcher,
executor)
self._allow_requeue = allow_requeue
self._pool = pool
self.targets = targets
@ -175,7 +175,7 @@ class NotificationServer(NotificationServerBase):
"get_notification_transport to obtain a "
"notification transport instance.")
super(NotificationServer, self).__init__(
super().__init__(
transport, targets, dispatcher, executor, allow_requeue, pool, 1,
None
)

View File

@ -1,4 +1,3 @@
# Copyright 2011 OpenStack Foundation.
# All Rights Reserved.
# Copyright 2013 Red Hat, Inc.
@ -67,13 +66,14 @@ class MessagingDriver(notifier.Driver):
"""
def __init__(self, conf, topics, transport, version=1.0):
super(MessagingDriver, self).__init__(conf, topics, transport)
super().__init__(conf, topics, transport)
self.version = version
def notify(self, ctxt, message, priority, retry):
priority = priority.lower()
for topic in self.topics:
target = oslo_messaging.Target(topic='%s.%s' % (topic, priority))
target = oslo_messaging.Target(
topic='{}.{}'.format(topic, priority))
try:
self.transport._send_notification(target, ctxt, message,
version=self.version,
@ -89,4 +89,4 @@ class MessagingV2Driver(MessagingDriver):
"Send notifications using the 2.0 message format."
def __init__(self, conf, **kwargs):
super(MessagingV2Driver, self).__init__(conf, version=2.0, **kwargs)
super().__init__(conf, version=2.0, **kwargs)

View File

@ -63,7 +63,7 @@ class RequestNotifier(base.Middleware):
self.service_name = conf.get('service_name')
self.ignore_req_list = [x.upper().strip() for x in
conf.get('ignore_req_list', '').split(',')]
super(RequestNotifier, self).__init__(app)
super().__init__(app)
@staticmethod
def environ_to_dict(environ):
@ -71,8 +71,8 @@ class RequestNotifier(base.Middleware):
include them.
"""
return dict((k, v) for k, v in environ.items()
if k.isupper() and k != 'HTTP_X_AUTH_TOKEN')
return {k: v for k, v in environ.items()
if k.isupper() and k != 'HTTP_X_AUTH_TOKEN'}
@log_and_ignore_error
def process_request(self, request):

View File

@ -1,4 +1,3 @@
# Copyright 2011 OpenStack Foundation.
# All Rights Reserved.
# Copyright 2013 Red Hat, Inc.
@ -105,7 +104,7 @@ def _send_notification():
notifier._notify({}, args.event_type, args.payload, args.priority)
class Driver(object, metaclass=abc.ABCMeta):
class Driver(metaclass=abc.ABCMeta):
"""Base driver for Notifications"""
def __init__(self, conf, topics, transport):
@ -189,7 +188,7 @@ def _sanitize_context(ctxt):
return {}
class Notifier(object):
class Notifier:
"""Send notification messages.
@ -468,7 +467,7 @@ class _SubNotifier(Notifier):
self._driver_mgr = self._base._driver_mgr
def _notify(self, ctxt, event_type, payload, priority):
super(_SubNotifier, self)._notify(ctxt, event_type, payload, priority)
super()._notify(ctxt, event_type, payload, priority)
@classmethod
def _prepare(cls, base, publisher_id=_marker, retry=_marker):

View File

@ -1,4 +1,3 @@
# Copyright 2014 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may

View File

@ -1,4 +1,3 @@
# Copyright 2013 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may

View File

@ -60,7 +60,7 @@ class RemoteError(exceptions.MessagingException):
msg = ("Remote error: %(exc_type)s %(value)s\n%(traceback)s." %
dict(exc_type=self.exc_type, value=self.value,
traceback=self.traceback))
super(RemoteError, self).__init__(msg)
super().__init__(msg)
class RPCVersionCapError(exceptions.MessagingException):
@ -73,20 +73,20 @@ class RPCVersionCapError(exceptions.MessagingException):
"in minor version as the specified version cap "
"%(version_cap)s." %
dict(version=self.version, version_cap=self.version_cap))
super(RPCVersionCapError, self).__init__(msg)
super().__init__(msg)
class ClientSendError(exceptions.MessagingException):
"""Raised if we failed to send a message to a target."""
def __init__(self, target, ex):
msg = 'Failed to send to target "%s": %s' % (target, ex)
super(ClientSendError, self).__init__(msg)
msg = 'Failed to send to target "{}": {}'.format(target, ex)
super().__init__(msg)
self.target = target
self.ex = ex
class _BaseCallContext(object, metaclass=abc.ABCMeta):
class _BaseCallContext(metaclass=abc.ABCMeta):
_marker = object()
@ -104,7 +104,7 @@ class _BaseCallContext(object, metaclass=abc.ABCMeta):
self.version_cap = version_cap
self.transport_options = transport_options
super(_BaseCallContext, self).__init__()
super().__init__()
def _make_message(self, ctxt, method, args):
msg = dict(method=method)
@ -227,8 +227,8 @@ class _CallContext(_BaseCallContext):
version=version,
server=server,
fanout=fanout)
kwargs = dict([(k, v) for k, v in kwargs.items()
if v is not cls._marker])
kwargs = {k: v for k, v in kwargs.items()
if v is not cls._marker}
target = call_context.target(**kwargs)
if timeout is cls._marker:
@ -398,7 +398,7 @@ class RPCClient(_BaseCallContext):
"get_rpc_transport to obtain an RPC transport "
"instance.")
super(RPCClient, self).__init__(
super().__init__(
transport, target, serializer, timeout, version_cap, retry,
call_monitor_timeout, transport_options
)

View File

@ -53,7 +53,7 @@ __all__ = [
LOG = logging.getLogger(__name__)
class PingEndpoint(object):
class PingEndpoint:
def oslo_rpc_server_ping(self, ctxt, **kwargs):
return 'pong'
@ -78,7 +78,7 @@ class NoSuchMethod(RPCDispatcherError, AttributeError):
def __init__(self, method):
msg = "Endpoint does not support RPC method %s" % method
super(NoSuchMethod, self).__init__(msg)
super().__init__(msg)
self.method = method
@ -88,13 +88,13 @@ class UnsupportedVersion(RPCDispatcherError):
def __init__(self, version, method=None):
msg = "Endpoint does not support RPC version %s" % version
if method:
msg = "%s. Attempted method: %s" % (msg, method)
super(UnsupportedVersion, self).__init__(msg)
msg = "{}. Attempted method: {}".format(msg, method)
super().__init__(msg)
self.version = version
self.method = method
class RPCAccessPolicyBase(object, metaclass=ABCMeta):
class RPCAccessPolicyBase(metaclass=ABCMeta):
"""Determines which endpoint methods may be invoked via RPC"""
@abstractmethod
@ -177,7 +177,7 @@ class RPCDispatcher(dispatcher.DispatcherBase):
" for namespace and version filtering. It must" + \
" be of type oslo_messaging.Target. Do not" + \
" define an Endpoint method named 'target'"
raise TypeError("%s: endpoint=%s" % (errmsg, ep))
raise TypeError("{}: endpoint={}".format(errmsg, ep))
# Check if we have an attribute named 'oslo_rpc_server_ping'
oslo_rpc_server_ping = getattr(ep, 'oslo_rpc_server_ping', None)
@ -186,7 +186,7 @@ class RPCDispatcher(dispatcher.DispatcherBase):
" attribute which can be use to ping the" + \
" endpoint. Please avoid using any oslo_* " + \
" naming."
LOG.warning("%s (endpoint=%s)" % (errmsg, ep))
LOG.warning("{} (endpoint={})".format(errmsg, ep))
self.endpoints = endpoints

View File

@ -140,7 +140,7 @@ LOG = logging.getLogger(__name__)
class RPCServer(msg_server.MessageHandlingServer):
def __init__(self, transport, target, dispatcher, executor=None):
super(RPCServer, self).__init__(transport, dispatcher, executor)
super().__init__(transport, dispatcher, executor)
if not isinstance(transport, msg_transport.RPCTransport):
LOG.warning("Using notification transport for RPC. Please use "
"get_rpc_transport to obtain an RPC transport "

View File

@ -21,7 +21,7 @@ from oslo_serialization import jsonutils
__all__ = ['Serializer', 'NoOpSerializer', 'JsonPayloadSerializer']
class Serializer(object, metaclass=abc.ABCMeta):
class Serializer(metaclass=abc.ABCMeta):
"""Generic (de-)serialization definition base class."""
@abc.abstractmethod

View File

@ -64,8 +64,8 @@ class ExecutorLoadFailure(MessagingServerError):
"""Raised if an executor can't be loaded."""
def __init__(self, executor, ex):
msg = 'Failed to load executor "%s": %s' % (executor, ex)
super(ExecutorLoadFailure, self).__init__(msg)
msg = 'Failed to load executor "{}": {}'.format(executor, ex)
super().__init__(msg)
self.executor = executor
self.ex = ex
@ -74,8 +74,8 @@ class ServerListenError(MessagingServerError):
"""Raised if we failed to listen on a target."""
def __init__(self, target, ex):
msg = 'Failed to listen on target "%s": %s' % (target, ex)
super(ServerListenError, self).__init__(msg)
msg = 'Failed to listen on target "{}": {}'.format(target, ex)
super().__init__(msg)
self.target = target
self.ex = ex
@ -84,7 +84,7 @@ class TaskTimeout(MessagingServerError):
"""Raised if we timed out waiting for a task to complete."""
class _OrderedTask(object):
class _OrderedTask:
"""A task which must be executed in a particular order.
A caller may wait for this task to complete by calling
@ -105,7 +105,7 @@ class _OrderedTask(object):
:param name: The name of this task. Used in log messages.
"""
super(_OrderedTask, self).__init__()
super().__init__()
self._name = name
self._cond = threading.Condition()
@ -158,7 +158,7 @@ class _OrderedTask(object):
`timeout_timer` expires while waiting.
"""
with self._cond:
msg = '%s is waiting for %s to complete' % (caller, self._name)
msg = '{} is waiting for {} to complete'.format(caller, self._name)
self._wait(lambda: not self.complete,
msg, log_after, timeout_timer)
@ -206,11 +206,11 @@ class _OrderedTask(object):
msg, log_after, timeout_timer)
class _OrderedTaskRunner(object):
class _OrderedTaskRunner:
"""Mixin for a class which executes ordered tasks."""
def __init__(self, *args, **kwargs):
super(_OrderedTaskRunner, self).__init__(*args, **kwargs)
super().__init__(*args, **kwargs)
# Get a list of methods on this object which have the _ordered
# attribute
@ -358,7 +358,7 @@ class MessageHandlingServer(service.ServiceBase, _OrderedTaskRunner,
self._started = False
super(MessageHandlingServer, self).__init__()
super().__init__()
def _on_incoming(self, incoming):
"""Handles on_incoming event

View File

@ -1,4 +1,3 @@
# Copyright 2013 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
@ -14,7 +13,7 @@
# under the License.
class Target(object):
class Target:
"""Identifies the destination of messages.

View File

@ -69,7 +69,7 @@ def _wait_until(predicate, timeout):
class _ListenerThread(threading.Thread):
"""Run a blocking listener in a thread."""
def __init__(self, listener, msg_count, msg_ack=True):
super(_ListenerThread, self).__init__()
super().__init__()
self.listener = listener
self.msg_count = msg_count
self._msg_ack = msg_ack
@ -118,7 +118,7 @@ class _SlowResponder(_ListenerThread):
# an RPC listener that pauses delay seconds before replying
def __init__(self, listener, delay, msg_count=1):
self._delay = delay
super(_SlowResponder, self).__init__(listener, msg_count)
super().__init__(listener, msg_count)
def run(self):
LOG.debug("_SlowResponder started")
@ -140,7 +140,7 @@ class _CallMonitor(_ListenerThread):
def __init__(self, listener, delay, hb_count, msg_count=1):
self._delay = delay
self._hb_count = hb_count
super(_CallMonitor, self).__init__(listener, msg_count)
super().__init__(listener, msg_count)
def run(self):
LOG.debug("_CallMonitor started")
@ -167,7 +167,7 @@ class _CallMonitor(_ListenerThread):
class TestProtonDriverLoad(test_utils.BaseTestCase):
def setUp(self):
super(TestProtonDriverLoad, self).setUp()
super().setUp()
self.messaging_conf.transport_url = 'amqp://'
def test_driver_load(self):
@ -180,7 +180,7 @@ class _AmqpBrokerTestCase(test_utils.BaseTestCase):
"""Creates a single FakeBroker for use by the tests"""
@testtools.skipUnless(pyngus, "proton modules not present")
def setUp(self):
super(_AmqpBrokerTestCase, self).setUp()
super().setUp()
self._broker = FakeBroker(self.conf.oslo_messaging_amqp)
self._broker_addr = "amqp://%s:%d" % (self._broker.host,
self._broker.port)
@ -188,7 +188,7 @@ class _AmqpBrokerTestCase(test_utils.BaseTestCase):
self.conf, self._broker_addr)
def tearDown(self):
super(_AmqpBrokerTestCase, self).tearDown()
super().tearDown()
if self._broker:
self._broker.stop()
@ -197,7 +197,7 @@ class _AmqpBrokerTestCaseAuto(_AmqpBrokerTestCase):
"""Like _AmqpBrokerTestCase, but starts the broker"""
@testtools.skipUnless(pyngus, "proton modules not present")
def setUp(self):
super(_AmqpBrokerTestCaseAuto, self).setUp()
super().setUp()
self._broker.start()
@ -439,7 +439,7 @@ class TestAmqpSend(_AmqpBrokerTestCaseAuto):
"""Send back an exception generated at the listener"""
class _FailedResponder(_ListenerThread):
def __init__(self, listener):
super(_FailedResponder, self).__init__(listener, 1)
super().__init__(listener, 1)
def run(self):
self.started.set()
@ -471,7 +471,7 @@ class TestAmqpSend(_AmqpBrokerTestCaseAuto):
"""What happens if the replier times out?"""
class _TimeoutListener(_ListenerThread):
def __init__(self, listener):
super(_TimeoutListener, self).__init__(listener, 1)
super().__init__(listener, 1)
def run(self):
self.started.set()
@ -733,7 +733,7 @@ class TestAmqpNotification(_AmqpBrokerTestCaseAuto):
class TestAuthentication(test_utils.BaseTestCase):
"""Test user authentication using the old pyngus API"""
def setUp(self):
super(TestAuthentication, self).setUp()
super().setUp()
# for simplicity, encode the credentials as they would appear 'on the
# wire' in a SASL frame - username and password prefixed by zero.
user_credentials = ["\0joe\0secret"]
@ -743,7 +743,7 @@ class TestAuthentication(test_utils.BaseTestCase):
self._broker.start()
def tearDown(self):
super(TestAuthentication, self).tearDown()
super().tearDown()
self._broker.stop()
def test_authentication_ok(self):
@ -832,7 +832,7 @@ mech_list: ${mechs}
def setUp(self):
# fire up a test broker with the SASL config:
super(TestCyrusAuthentication, self).setUp()
super().setUp()
if TestCyrusAuthentication._conf_dir is None:
self.skipTest("Cyrus SASL tools not installed")
_mechs = TestCyrusAuthentication._mechs
@ -850,7 +850,7 @@ mech_list: ${mechs}
if self._broker:
self._broker.stop()
self._broker = None
super(TestCyrusAuthentication, self).tearDown()
super().tearDown()
def _authentication_test(self, addr, retry=None):
url = oslo_messaging.TransportURL.parse(self.conf, addr)
@ -927,7 +927,7 @@ mech_list: ${mechs}
class TestFailover(test_utils.BaseTestCase):
def setUp(self):
super(TestFailover, self).setUp()
super().setUp()
# configure different addressing modes on the brokers to test failing
# over from one type of backend to another
self.config(addressing_mode='dynamic', group="oslo_messaging_amqp")
@ -941,7 +941,7 @@ class TestFailover(test_utils.BaseTestCase):
self._broker_url = self._gen_transport_url(hosts)
def tearDown(self):
super(TestFailover, self).tearDown()
super().tearDown()
for broker in self._brokers:
if broker.is_alive():
broker.stop()
@ -1292,7 +1292,7 @@ class TestAddressing(test_utils.BaseTestCase):
expected = []
for n in targets_priorities:
# this is how the notifier creates an address:
topic = "%s.%s" % (n[0].topic, n[1])
topic = "{}.{}".format(n[0].topic, n[1])
target = oslo_messaging.Target(topic=topic)
driver.send_notification(target, {"context": "whatever"},
{"msg": topic}, 2.0)
@ -1614,7 +1614,7 @@ class TestSSL(TestFailover):
self._tmpdir = None
self.skipTest("OpenSSL tools not installed - skipping")
super(TestSSL, self).setUp()
super().setUp()
self.config(ssl_ca_file=self._ssl_config['ca_cert'],
group='oslo_messaging_amqp')
@ -1767,7 +1767,7 @@ class TestSSL(TestFailover):
self._broker = None
if self._tmpdir:
shutil.rmtree(self._tmpdir, ignore_errors=True)
super(TestSSL, self).tearDown()
super().tearDown()
@testtools.skipUnless(pyngus, "proton modules not present")
@ -1931,7 +1931,7 @@ class FakeBroker(threading.Thread):
try:
pyngus.read_socket_input(self.connection, self.socket)
self.connection.process(time.time())
except socket.error:
except OSError:
self._socket_error()
def send_output(self):
@ -1940,7 +1940,7 @@ class FakeBroker(threading.Thread):
pyngus.write_socket_output(self.connection,
self.socket)
self.connection.process(time.time())
except socket.error:
except OSError:
self._socket_error()
def _socket_error(self):
@ -2166,7 +2166,7 @@ class FakeBroker(threading.Thread):
self.daemon = True
self._pause.set()
self._my_socket.listen(10)
super(FakeBroker, self).start()
super().start()
def pause(self):
self._pause.clear()

View File

@ -27,7 +27,7 @@ load_tests = testscenarios.load_tests_apply_scenarios
class TestKafkaDriverLoad(test_utils.BaseTestCase):
def setUp(self):
super(TestKafkaDriverLoad, self).setUp()
super().setUp()
self.messaging_conf.transport_url = 'kafka:/'
def test_driver_load(self):
@ -79,7 +79,7 @@ class TestKafkaTransportURL(test_utils.BaseTestCase):
]
def setUp(self):
super(TestKafkaTransportURL, self).setUp()
super().setUp()
self.messaging_conf.transport_url = 'kafka:/'
def test_transport_url(self):
@ -100,7 +100,7 @@ class TestKafkaDriver(test_utils.BaseTestCase):
"""
def setUp(self):
super(TestKafkaDriver, self).setUp()
super().setUp()
self.messaging_conf.transport_url = 'kafka:/'
transport = oslo_messaging.get_notification_transport(self.conf)
self.driver = transport._driver
@ -202,7 +202,7 @@ class TestKafkaDriver(test_utils.BaseTestCase):
class TestKafkaConnection(test_utils.BaseTestCase):
def setUp(self):
super(TestKafkaConnection, self).setUp()
super().setUp()
self.messaging_conf.transport_url = 'kafka:/'
transport = oslo_messaging.get_notification_transport(self.conf)
self.driver = transport._driver

View File

@ -490,7 +490,7 @@ class TestRabbitTransportURL(test_utils.BaseTestCase):
]
def setUp(self):
super(TestRabbitTransportURL, self).setUp()
super().setUp()
self.messaging_conf.transport_url = 'rabbit:/'
self.config(heartbeat_timeout_threshold=0,
group='oslo_messaging_rabbit')
@ -778,7 +778,7 @@ def _declare_queue(target):
type='topic',
durable=False,
auto_delete=False)
topic = '%s.%s' % (target.topic, target.server)
topic = '{}.{}'.format(target.topic, target.server)
queue = kombu.entity.Queue(name=topic,
channel=channel,
exchange=exchange,
@ -840,7 +840,7 @@ class TestRequestWireFormat(test_utils.BaseTestCase):
cls._compression)
def setUp(self):
super(TestRequestWireFormat, self).setUp()
super().setUp()
self.uuids = []
self.orig_uuid4 = uuid.uuid4
self.useFixture(fixtures.MonkeyPatch('uuid.uuid4', self.mock_uuid4))
@ -928,7 +928,7 @@ def _create_producer(target):
type='topic',
durable=False,
auto_delete=False)
topic = '%s.%s' % (target.topic, target.server)
topic = '{}.{}'.format(target.topic, target.server)
producer = kombu.messaging.Producer(exchange=exchange,
channel=channel,
routing_key=topic)
@ -1035,7 +1035,7 @@ TestReplyWireFormat.generate_scenarios()
class RpcKombuHATestCase(test_utils.BaseTestCase):
def setUp(self):
super(RpcKombuHATestCase, self).setUp()
super().setUp()
transport_url = 'rabbit:/host1,host2,host3,host4,host5/'
self.messaging_conf.transport_url = transport_url
self.config(rabbit_retry_interval=0.01,

View File

@ -1,4 +1,3 @@
# Copyright 2013 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
@ -47,7 +46,7 @@ class PoolTestCase(test_utils.BaseTestCase):
def create(self, retry=None):
return uuid.uuid4()
class ThreadWaitWaiter(object):
class ThreadWaitWaiter:
"""A gross hack.

View File

@ -50,7 +50,7 @@ class LoggingNotificationHandlerTestCase(utils.SkipIfNoTransportURL):
def test_logging(self):
# NOTE(gtt): Using different topic to make tests run in parallel
topic = 'test_logging_%s_driver_%s' % (self.priority, self.driver)
topic = 'test_logging_{}_driver_{}'.format(self.priority, self.driver)
if self.notify_url.startswith("kafka://"):
self.conf.set_override('consumer_group', str(uuid.uuid4()),

View File

@ -28,7 +28,7 @@ from oslo_messaging.tests.functional import utils
class CallTestCase(utils.SkipIfNoTransportURL):
def setUp(self):
super(CallTestCase, self).setUp(conf=cfg.ConfigOpts())
super().setUp(conf=cfg.ConfigOpts())
if self.rpc_url.startswith("kafka://"):
self.skipTest("kafka does not support RPC API")
@ -197,7 +197,7 @@ class CallTestCase(utils.SkipIfNoTransportURL):
target = oslo_messaging.Target(topic='topic_' + str(uuid.uuid4()),
server='server_' + str(uuid.uuid4()))
class _endpoint(object):
class _endpoint:
def delay(self, ctxt, seconds):
time.sleep(seconds)
return seconds
@ -231,7 +231,7 @@ class CallTestCase(utils.SkipIfNoTransportURL):
namespace="Name1",
version="7.5")
class _endpoint(object):
class _endpoint:
def __init__(self, target):
self.target = target()
@ -274,7 +274,7 @@ class CallTestCase(utils.SkipIfNoTransportURL):
def test_bad_endpoint(self):
# 'target' attribute is reserved and should be of type Target
class _endpoint(object):
class _endpoint:
def target(self, ctxt, echo):
return echo
@ -297,7 +297,7 @@ class CastTestCase(utils.SkipIfNoTransportURL):
# making the necessary assertions.
def setUp(self):
super(CastTestCase, self).setUp()
super().setUp()
if self.rpc_url.startswith("kafka://"):
self.skipTest("kafka does not support RPC API")
@ -572,7 +572,7 @@ class NotifyTestCase(utils.SkipIfNoTransportURL):
class MetricsTestCase(utils.SkipIfNoTransportURL):
def setUp(self):
super(MetricsTestCase, self).setUp(conf=cfg.ConfigOpts())
super().setUp(conf=cfg.ConfigOpts())
if self.rpc_url.startswith("kafka://"):
self.skipTest("kafka does not support RPC API")

View File

@ -22,7 +22,7 @@ from oslo_messaging.tests.functional import utils
from oslo_messaging.tests import utils as test_utils
class ConnectedPortMatcher(object):
class ConnectedPortMatcher:
def __init__(self, port):
self.port = port

View File

@ -25,7 +25,7 @@ from oslo_messaging.notify import notifier
from oslo_messaging.tests import utils as test_utils
class TestServerEndpoint(object):
class TestServerEndpoint:
"""This MessagingServer that will be used during functional testing."""
def __init__(self):
@ -58,7 +58,7 @@ class TransportFixture(fixtures.Fixture):
self.url = url
def setUp(self):
super(TransportFixture, self).setUp()
super().setUp()
self.transport = oslo_messaging.get_transport(self.conf, url=self.url)
def cleanUp(self):
@ -66,7 +66,7 @@ class TransportFixture(fixtures.Fixture):
self.transport.cleanup()
except fixtures.TimeoutException:
pass
super(TransportFixture, self).cleanUp()
super().cleanUp()
def wait(self):
# allow time for the server to connect to the broker
@ -77,7 +77,7 @@ class RPCTransportFixture(TransportFixture):
"""Fixture defined to setup RPC transport."""
def setUp(self):
super(RPCTransportFixture, self).setUp()
super().setUp()
self.transport = oslo_messaging.get_rpc_transport(self.conf,
url=self.url)
@ -86,7 +86,7 @@ class NotificationTransportFixture(TransportFixture):
"""Fixture defined to setup notification transport."""
def setUp(self):
super(NotificationTransportFixture, self).setUp()
super().setUp()
self.transport = oslo_messaging.get_notification_transport(
self.conf, url=self.url)
@ -96,7 +96,7 @@ class RpcServerFixture(fixtures.Fixture):
def __init__(self, conf, url, target, endpoint=None, ctrl_target=None,
executor='eventlet'):
super(RpcServerFixture, self).__init__()
super().__init__()
self.conf = conf
self.url = url
self.target = target
@ -106,7 +106,7 @@ class RpcServerFixture(fixtures.Fixture):
self.ctrl_target = ctrl_target or self.target
def setUp(self):
super(RpcServerFixture, self).setUp()
super().setUp()
endpoints = [self.endpoint, self]
transport = self.useFixture(RPCTransportFixture(self.conf, self.url))
self.server = oslo_messaging.get_rpc_server(
@ -121,7 +121,7 @@ class RpcServerFixture(fixtures.Fixture):
def cleanUp(self):
self._stop()
super(RpcServerFixture, self).cleanUp()
super().cleanUp()
def _start(self):
self.thread = test_utils.ServerThreadHelper(self.server)
@ -156,7 +156,7 @@ class RpcServerGroupFixture(fixtures.Fixture):
self.endpoint = endpoint
def setUp(self):
super(RpcServerGroupFixture, self).setUp()
super().setUp()
self.servers = [self.useFixture(self._server(t)) for t in self.targets]
def _target(self, server=None, fanout=False):
@ -205,7 +205,7 @@ class RpcServerGroupFixture(fixtures.Fixture):
raise ValueError("Invalid value for server: %r" % server)
class RpcCall(object):
class RpcCall:
def __init__(self, client, method, context):
self.client = client
self.method = method
@ -225,7 +225,7 @@ class RpcCast(RpcCall):
self.client.cast(self.context, self.method, **kwargs)
class ClientStub(object):
class ClientStub:
def __init__(self, transport, target, cast=False, name=None,
transport_options=None, **kwargs):
self.name = name or "functional-tests"
@ -244,7 +244,7 @@ class ClientStub(object):
return RpcCall(self.client, name, context)
class InvalidDistribution(object):
class InvalidDistribution:
def __init__(self, original, received):
self.original = original
self.received = received
@ -253,10 +253,11 @@ class InvalidDistribution(object):
self.wrong_order = []
def describe(self):
text = "Sent %s, got %s; " % (self.original, self.received)
text = "Sent {}, got {}; ".format(self.original, self.received)
e1 = ["%r was missing" % m for m in self.missing]
e2 = ["%r was not expected" % m for m in self.extra]
e3 = ["%r expected before %r" % (m[0], m[1]) for m in self.wrong_order]
e3 = ["{!r} expected before {!r}".format(
m[0], m[1]) for m in self.wrong_order]
return text + ", ".join(e1 + e2 + e3)
def __len__(self):
@ -266,7 +267,7 @@ class InvalidDistribution(object):
return {}
class IsValidDistributionOf(object):
class IsValidDistributionOf:
"""Test whether a given list can be split into particular
sub-lists. All items in the original list must be in exactly one
sub-list, and must appear in that sub-list in the same order with
@ -303,7 +304,7 @@ class IsValidDistributionOf(object):
class SkipIfNoTransportURL(test_utils.BaseTestCase):
def setUp(self, conf=cfg.CONF):
super(SkipIfNoTransportURL, self).setUp(conf=conf)
super().setUp(conf=conf)
self.rpc_url = os.environ.get('RPC_TRANSPORT_URL')
self.notify_url = os.environ.get('NOTIFY_TRANSPORT_URL')
@ -319,7 +320,7 @@ class SkipIfNoTransportURL(test_utils.BaseTestCase):
class NotificationFixture(fixtures.Fixture):
def __init__(self, conf, url, topics, batch=None):
super(NotificationFixture, self).__init__()
super().__init__()
self.conf = conf
self.url = url
self.topics = topics
@ -328,7 +329,7 @@ class NotificationFixture(fixtures.Fixture):
self.batch = batch
def setUp(self):
super(NotificationFixture, self).setUp()
super().setUp()
targets = [oslo_messaging.Target(topic=t) for t in self.topics]
# add a special topic for internal notifications
targets.append(oslo_messaging.Target(topic=self.name))
@ -341,7 +342,7 @@ class NotificationFixture(fixtures.Fixture):
def cleanUp(self):
self._stop()
super(NotificationFixture, self).cleanUp()
super().cleanUp()
def _get_server(self, transport, targets):
return oslo_messaging.get_notification_listener(
@ -402,7 +403,7 @@ class NotificationFixture(fixtures.Fixture):
class BatchNotificationFixture(NotificationFixture):
def __init__(self, conf, url, topics, batch_size=5, batch_timeout=2):
super(BatchNotificationFixture, self).__init__(conf, url, topics)
super().__init__(conf, url, topics)
self.batch_size = batch_size
self.batch_timeout = batch_timeout

View File

@ -1,4 +1,3 @@
# Copyright 2013 eNovance
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may

View File

@ -1,4 +1,3 @@
# Copyright 2013 eNovance
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
@ -28,7 +27,7 @@ from unittest import mock
load_tests = testscenarios.load_tests_apply_scenarios
class RestartableServerThread(object):
class RestartableServerThread:
def __init__(self, server):
self.server = server
self.thread = None
@ -48,9 +47,9 @@ class RestartableServerThread(object):
return True
class ListenerSetupMixin(object):
class ListenerSetupMixin:
class ThreadTracker(object):
class ThreadTracker:
def __init__(self):
self._received_msgs = 0
self.threads = []
@ -125,11 +124,11 @@ class ListenerSetupMixin(object):
class TestNotifyListener(test_utils.BaseTestCase, ListenerSetupMixin):
def __init__(self, *args):
super(TestNotifyListener, self).__init__(*args)
super().__init__(*args)
ListenerSetupMixin.__init__(self)
def setUp(self):
super(TestNotifyListener, self).setUp(conf=cfg.ConfigOpts())
super().setUp(conf=cfg.ConfigOpts())
ListenerSetupMixin.setUp(self)
self.useFixture(fixtures.MonkeyPatch(
'oslo_messaging._drivers.impl_fake.FakeExchangeManager._exchanges',

View File

@ -23,7 +23,7 @@ from unittest import mock
class PublishErrorsHandlerTestCase(test_utils.BaseTestCase):
"""Tests for log.PublishErrorsHandler"""
def setUp(self):
super(PublishErrorsHandlerTestCase, self).setUp()
super().setUp()
self.publisherrorshandler = (log_handler.
PublishErrorsHandler(logging.ERROR))

View File

@ -46,7 +46,7 @@ class TestLogNotifier(test_utils.BaseTestCase):
]
def setUp(self):
super(TestLogNotifier, self).setUp()
super().setUp()
self.addCleanup(oslo_messaging.notify._impl_test.reset)
self.config(driver=['test'],
group='oslo_messaging_notifications')

View File

@ -22,7 +22,7 @@ from oslo_messaging.tests import utils
from unittest import mock
class FakeApp(object):
class FakeApp:
def __call__(self, env, start_response):
body = 'Some response'
start_response('200 OK', [
@ -32,7 +32,7 @@ class FakeApp(object):
return [body]
class FakeFailingApp(object):
class FakeFailingApp:
def __call__(self, env, start_response):
raise Exception("It happens!")
@ -51,7 +51,7 @@ class NotifierMiddlewareTest(utils.BaseTestCase):
call_args = notify.call_args_list[0][0]
self.assertEqual('http.request', call_args[1])
self.assertEqual('INFO', call_args[3])
self.assertEqual(set(['request']),
self.assertEqual({'request'},
set(call_args[2].keys()))
request = call_args[2]['request']
@ -67,7 +67,7 @@ class NotifierMiddlewareTest(utils.BaseTestCase):
call_args = notify.call_args_list[1][0]
self.assertEqual('http.response', call_args[1])
self.assertEqual('INFO', call_args[3])
self.assertEqual(set(['request', 'response']),
self.assertEqual({'request', 'response'},
set(call_args[2].keys()))
request = call_args[2]['request']
@ -99,7 +99,7 @@ class NotifierMiddlewareTest(utils.BaseTestCase):
call_args = notify.call_args_list[0][0]
self.assertEqual('http.request', call_args[1])
self.assertEqual('INFO', call_args[3])
self.assertEqual(set(['request']),
self.assertEqual({'request'},
set(call_args[2].keys()))
request = call_args[2]['request']
@ -115,7 +115,7 @@ class NotifierMiddlewareTest(utils.BaseTestCase):
call_args = notify.call_args_list[1][0]
self.assertEqual('http.response', call_args[1])
self.assertEqual('INFO', call_args[3])
self.assertEqual(set(['request', 'exception']),
self.assertEqual({'request', 'exception'},
set(call_args[2].keys()))
request = call_args[2]['request']
@ -177,7 +177,7 @@ class NotifierMiddlewareTest(utils.BaseTestCase):
call_args = notify.call_args_list[0][0]
self.assertEqual('http.request', call_args[1])
self.assertEqual('INFO', call_args[3])
self.assertEqual(set(['request']),
self.assertEqual({'request'},
set(call_args[2].keys()))
request = call_args[2]['request']
@ -187,5 +187,5 @@ class NotifierMiddlewareTest(utils.BaseTestCase):
call_args = notify.call_args_list[1][0]
self.assertEqual('http.response', call_args[1])
self.assertEqual('INFO', call_args[3])
self.assertEqual(set(['request', 'response']),
self.assertEqual({'request', 'response'},
set(call_args[2].keys()))

View File

@ -40,7 +40,7 @@ from unittest import mock
load_tests = testscenarios.load_tests_apply_scenarios
class JsonMessageMatcher(object):
class JsonMessageMatcher:
def __init__(self, message):
self.message = message
@ -57,7 +57,7 @@ class _ReRaiseLoggedExceptionsFixture(fixtures.Fixture):
during cleanup.
"""
class FakeLogger(object):
class FakeLogger:
def __init__(self):
self.exceptions = []
@ -69,7 +69,7 @@ class _ReRaiseLoggedExceptionsFixture(fixtures.Fixture):
return
def setUp(self):
super(_ReRaiseLoggedExceptionsFixture, self).setUp()
super().setUp()
self.logger = self.FakeLogger()
@ -145,7 +145,7 @@ class TestMessagingNotifier(test_utils.BaseTestCase):
cls._retry)
def setUp(self):
super(TestMessagingNotifier, self).setUp()
super().setUp()
self.logger = self.useFixture(_ReRaiseLoggedExceptionsFixture()).logger
self.useFixture(fixtures.MockPatchObject(
@ -211,8 +211,8 @@ class TestMessagingNotifier(test_utils.BaseTestCase):
send_kwargs['retry'] = self.retry
else:
send_kwargs['retry'] = -1
target = oslo_messaging.Target(topic='%s.%s' % (topic,
self.priority))
target = oslo_messaging.Target(topic='{}.{}'.format(
topic, self.priority))
calls.append(mock.call(target,
self.ctxt,
message,
@ -307,7 +307,7 @@ class TestMessagingNotifierRetry(test_utils.BaseTestCase):
class TestSerializer(test_utils.BaseTestCase):
def setUp(self):
super(TestSerializer, self).setUp()
super().setUp()
self.addCleanup(_impl_test.reset)
@mock.patch('oslo_utils.timeutils.utcnow')
@ -484,7 +484,7 @@ class TestNotificationConfig(test_utils.BaseTestCase):
class TestRoutingNotifier(test_utils.BaseTestCase):
def setUp(self):
super(TestRoutingNotifier, self).setUp()
super().setUp()
self.config(driver=['routing'],
group='oslo_messaging_notifications')
@ -503,7 +503,7 @@ class TestRoutingNotifier(test_utils.BaseTestCase):
return extension.ExtensionManager.make_test_instance([])
def test_should_load_plugin(self):
self.router.used_drivers = set(["zoo", "blah"])
self.router.used_drivers = {"zoo", "blah"}
ext = mock.MagicMock()
ext.name = "foo"
self.assertFalse(self.router._should_load_plugin(ext))

View File

@ -1,4 +1,3 @@
# Copyright 2013 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
@ -209,7 +208,7 @@ class TestCastToTarget(test_utils.BaseTestCase):
cls._prepare)
def setUp(self):
super(TestCastToTarget, self).setUp(conf=cfg.ConfigOpts())
super().setUp(conf=cfg.ConfigOpts())
def test_cast_to_target(self):
target = oslo_messaging.Target(**self.ctor)

View File

@ -24,7 +24,7 @@ from unittest import mock
load_tests = testscenarios.load_tests_apply_scenarios
class _FakeEndpoint(object):
class _FakeEndpoint:
def __init__(self, target=None):
self.target = target
@ -229,7 +229,7 @@ class TestDispatcherWithPingEndpoint(test_utils.BaseTestCase):
self.assertEqual('pong', res)
def test_dispatcher_with_ping_already_used(self):
class MockEndpoint(object):
class MockEndpoint:
def oslo_rpc_server_ping(self, ctxt, **kwargs):
return 'not_pong'
@ -263,7 +263,7 @@ class TestSerializer(test_utils.BaseTestCase):
endpoint.foo = mock.Mock()
args = dict([(k, 'd' + v) for k, v in self.args.items()])
args = {k: 'd' + v for k, v in self.args.items()}
endpoint.foo.return_value = self.retval
serializer.serialize_entity = mock.Mock()
@ -303,7 +303,7 @@ class TestMonitorFailure(test_utils.BaseTestCase):
sending the heartbeat.
"""
class _SleepyEndpoint(object):
class _SleepyEndpoint:
def __init__(self, target=None):
self.target = target

View File

@ -1,4 +1,3 @@
# Copyright 2013 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
@ -33,9 +32,9 @@ from oslo_messaging.tests import utils as test_utils
load_tests = testscenarios.load_tests_apply_scenarios
class ServerSetupMixin(object):
class ServerSetupMixin:
class Server(object):
class Server:
def __init__(self, transport, topic, server, endpoint, serializer,
exchange):
self.controller = ServerSetupMixin.ServerController()
@ -60,14 +59,14 @@ class ServerSetupMixin(object):
def start(self):
self.server.start()
class ServerController(object):
class ServerController:
def __init__(self):
self.stopped = eventletutils.Event()
def stop(self, ctxt):
self.stopped.set()
class TestSerializer(object):
class TestSerializer:
def serialize_entity(self, ctxt, entity):
return ('s' + entity) if entity else entity
@ -76,10 +75,10 @@ class ServerSetupMixin(object):
return ('d' + entity) if entity else entity
def serialize_context(self, ctxt):
return dict([(k, 's' + v) for k, v in ctxt.items()])
return {k: 's' + v for k, v in ctxt.items()}
def deserialize_context(self, ctxt):
return dict([(k, 'd' + v) for k, v in ctxt.items()])
return {k: 'd' + v for k, v in ctxt.items()}
def __init__(self):
self.serializer = self.TestSerializer()
@ -109,11 +108,11 @@ class ServerSetupMixin(object):
class TestRPCServer(test_utils.BaseTestCase, ServerSetupMixin):
def __init__(self, *args):
super(TestRPCServer, self).__init__(*args)
super().__init__(*args)
ServerSetupMixin.__init__(self)
def setUp(self):
super(TestRPCServer, self).setUp(conf=cfg.ConfigOpts())
super().setUp(conf=cfg.ConfigOpts())
# FakeExchangeManager uses a class-level exchanges mapping; "reset" it
# before tests assert amount of items stored
self.useFixture(fixtures.MonkeyPatch(
@ -191,7 +190,7 @@ class TestRPCServer(test_utils.BaseTestCase, ServerSetupMixin):
"""
def __init__(self, *args, **kwargs):
super(MagicMockIgnoreArgs, self).__init__()
super().__init__()
server = oslo_messaging.get_rpc_server(transport, target, endpoints,
serializer=serializer)
@ -263,7 +262,7 @@ class TestRPCServer(test_utils.BaseTestCase, ServerSetupMixin):
finished = False
wait = threading.Condition()
class TestEndpoint(object):
class TestEndpoint:
def ping(self, ctxt, arg):
with wait:
if not finished:
@ -299,7 +298,7 @@ class TestRPCServer(test_utils.BaseTestCase, ServerSetupMixin):
def test_cast(self):
transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:')
class TestEndpoint(object):
class TestEndpoint:
def __init__(self):
self.pings = []
@ -326,7 +325,7 @@ class TestRPCServer(test_utils.BaseTestCase, ServerSetupMixin):
transport_cli = oslo_messaging.get_rpc_transport(self.conf,
url='fake:')
class TestEndpoint(object):
class TestEndpoint:
def ping(self, ctxt, arg):
return arg
@ -345,7 +344,7 @@ class TestRPCServer(test_utils.BaseTestCase, ServerSetupMixin):
def test_direct_call(self):
transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:')
class TestEndpoint(object):
class TestEndpoint:
def ping(self, ctxt, arg):
return arg
@ -365,7 +364,7 @@ class TestRPCServer(test_utils.BaseTestCase, ServerSetupMixin):
def test_context(self):
transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:')
class TestEndpoint(object):
class TestEndpoint:
def ctxt_check(self, ctxt, key):
return ctxt[key]
@ -382,7 +381,7 @@ class TestRPCServer(test_utils.BaseTestCase, ServerSetupMixin):
def test_failure(self):
transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:')
class TestEndpoint(object):
class TestEndpoint:
def ping(self, ctxt, arg):
raise ValueError(arg)
@ -440,7 +439,7 @@ class TestRPCServer(test_utils.BaseTestCase, ServerSetupMixin):
self.useFixture(fixtures.MockPatchObject(
rpc_server_module.LOG, 'error', stub_error))
class TestEndpoint(object):
class TestEndpoint:
@oslo_messaging.expected_exceptions(ValueError)
def ping(self, ctxt, arg):
raise ValueError(arg)
@ -569,11 +568,11 @@ class TestMultipleServers(test_utils.BaseTestCase, ServerSetupMixin):
cls.scenarios = [m(i) for i in cls.scenarios]
def __init__(self, *args):
super(TestMultipleServers, self).__init__(*args)
super().__init__(*args)
ServerSetupMixin.__init__(self)
def setUp(self):
super(TestMultipleServers, self).setUp(conf=cfg.ConfigOpts())
super().setUp(conf=cfg.ConfigOpts())
self.useFixture(fixtures.MonkeyPatch(
'oslo_messaging._drivers.impl_fake.FakeExchangeManager._exchanges',
new_value={}))
@ -587,7 +586,7 @@ class TestMultipleServers(test_utils.BaseTestCase, ServerSetupMixin):
else:
transport2 = transport1
class TestEndpoint(object):
class TestEndpoint:
def __init__(self):
self.pings = []
@ -651,7 +650,7 @@ TestMultipleServers.generate_scenarios()
class TestServerLocking(test_utils.BaseTestCase):
def setUp(self):
super(TestServerLocking, self).setUp(conf=cfg.ConfigOpts())
super().setUp(conf=cfg.ConfigOpts())
def _logmethod(name):
def method(self, *args, **kwargs):
@ -661,7 +660,7 @@ class TestServerLocking(test_utils.BaseTestCase):
executors = []
class FakeExecutor(object):
class FakeExecutor:
def __init__(self, *args, **kwargs):
self._lock = threading.Lock()
self._calls = []
@ -731,7 +730,7 @@ class TestServerLocking(test_utils.BaseTestCase):
running_event.set()
start_event.wait()
super(SteppingFakeExecutor, self).__init__(*args, **kwargs)
super().__init__(*args, **kwargs)
done_event.set()
finish_event.wait()

View File

@ -1,4 +1,3 @@
# Copyright 2013 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
@ -36,7 +35,7 @@ class NovaStyleException(Exception):
self.kwargs = kwargs
if not message:
message = self.format % kwargs
super(NovaStyleException, self).__init__(message)
super().__init__(message)
class KwargsStyleException(NovaStyleException):

View File

@ -1,4 +1,3 @@
# Copyright 2012 OpenStack Foundation
# Copyright 2013 Red Hat, Inc.
#

View File

@ -1,4 +1,3 @@
# Copyright 2014 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may

View File

@ -1,4 +1,3 @@
# Copyright 2013 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may

View File

@ -1,4 +1,3 @@
# Copyright 2013 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
@ -27,7 +26,7 @@ from oslo_messaging import transport
load_tests = testscenarios.load_tests_apply_scenarios
class _FakeDriver(object):
class _FakeDriver:
def __init__(self, conf):
self.conf = conf
@ -42,7 +41,7 @@ class _FakeDriver(object):
pass
class _FakeManager(object):
class _FakeManager:
def __init__(self, driver):
self.driver = driver
@ -160,13 +159,13 @@ class GetTransportSadPathTestCase(test_utils.BaseTestCase):
class _SetDefaultsFixture(fixtures.Fixture):
def __init__(self, set_defaults, opts, *names):
super(_SetDefaultsFixture, self).__init__()
super().__init__()
self.set_defaults = set_defaults
self.opts = opts
self.names = names
def setUp(self):
super(_SetDefaultsFixture, self).setUp()
super().setUp()
# FIXME(markmc): this comes from Id5c1f3ba
def first(seq, default=None, key=None):
@ -190,7 +189,7 @@ class _SetDefaultsFixture(fixtures.Fixture):
class TestSetDefaults(test_utils.BaseTestCase):
def setUp(self):
super(TestSetDefaults, self).setUp(conf=cfg.ConfigOpts())
super().setUp(conf=cfg.ConfigOpts())
self.useFixture(_SetDefaultsFixture(
oslo_messaging.set_transport_defaults,
transport._transport_opts,
@ -293,7 +292,7 @@ class TestTransportMethodArgs(test_utils.BaseTestCase):
class TestTransportUrlCustomisation(test_utils.BaseTestCase):
def setUp(self):
super(TestTransportUrlCustomisation, self).setUp()
super().setUp()
def transport_url_parse(url):
return transport.TransportURL.parse(self.conf, url)
@ -333,7 +332,7 @@ class TestTransportUrlCustomisation(test_utils.BaseTestCase):
class TestTransportHostCustomisation(test_utils.BaseTestCase):
def setUp(self):
super(TestTransportHostCustomisation, self).setUp()
super().setUp()
self.host1 = transport.TransportHost("host1", 5662, "user", "pass")
self.host2 = transport.TransportHost("host1", 5662, "user", "pass")
self.host3 = transport.TransportHost("host1", 5663, "user", "pass")

View File

@ -1,4 +1,3 @@
# Copyright 2013 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may

View File

@ -1,4 +1,3 @@
# Copyright 2013 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may

View File

@ -33,7 +33,7 @@ TRUE_VALUES = ('true', '1', 'yes')
class BaseTestCase(base.BaseTestCase):
def setUp(self, conf=cfg.CONF):
super(BaseTestCase, self).setUp()
super().setUp()
from oslo_messaging import conffixture
self.messaging_conf = self.useFixture(conffixture.ConfFixture(conf))
@ -62,14 +62,14 @@ class BaseTestCase(base.BaseTestCase):
class ServerThreadHelper(threading.Thread):
def __init__(self, server):
super(ServerThreadHelper, self).__init__()
super().__init__()
self.daemon = True
self._server = server
self._stop_event = eventletutils.Event()
self._start_event = eventletutils.Event()
def start(self):
super(ServerThreadHelper, self).start()
super().start()
self._start_event.wait()
def run(self):

View File

@ -1,4 +1,3 @@
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
@ -73,7 +72,7 @@ def set_transport_defaults(control_exchange):
control_exchange=control_exchange)
class Transport(object):
class Transport:
"""A messaging transport.
@ -163,21 +162,21 @@ class RPCTransport(Transport):
"""Transport object for RPC."""
def __init__(self, driver):
super(RPCTransport, self).__init__(driver)
super().__init__(driver)
class NotificationTransport(Transport):
"""Transport object for notifications."""
def __init__(self, driver):
super(NotificationTransport, self).__init__(driver)
super().__init__(driver)
class InvalidTransportURL(exceptions.MessagingException):
"""Raised if transport URL is invalid."""
def __init__(self, url, msg):
super(InvalidTransportURL, self).__init__(msg)
super().__init__(msg)
self.url = url
@ -185,8 +184,8 @@ class DriverLoadFailure(exceptions.MessagingException):
"""Raised if a transport driver can't be loaded."""
def __init__(self, driver, ex):
msg = 'Failed to load transport driver "%s": %s' % (driver, ex)
super(DriverLoadFailure, self).__init__(msg)
msg = 'Failed to load transport driver "{}": {}'.format(driver, ex)
super().__init__(msg)
self.driver = driver
self.ex = ex
@ -249,7 +248,7 @@ def get_transport(conf, url=None, allowed_remote_exmods=None):
transport_cls=RPCTransport)
class TransportHost(object):
class TransportHost:
"""A host element of a parsed transport URL."""
@ -278,7 +277,7 @@ class TransportHost(object):
return '<TransportHost ' + values + '>'
class TransportOptions(object):
class TransportOptions:
def __init__(self, at_least_once=False):
self._at_least_once = at_least_once
@ -288,7 +287,7 @@ class TransportOptions(object):
return self._at_least_once
class TransportURL(object):
class TransportURL:
"""A parsed transport URL.
@ -405,7 +404,7 @@ class TransportURL(object):
netlocs.append(netloc)
# Assemble the transport URL
url = '%s://%s/' % (self.transport, ','.join(netlocs))
url = '{}://{}/'.format(self.transport, ','.join(netlocs))
if self.virtual_host:
url += parse.quote(self.virtual_host)

View File

@ -1,4 +1,3 @@
# -*- coding: utf-8 -*-
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at

View File

@ -117,7 +117,7 @@ def update_message(message, **kwargs):
return Message(*message)._replace(**kwargs)
class MessageStatsCollector(object):
class MessageStatsCollector:
def __init__(self, label):
self.label = label
self.buffer = [] # buffer to store messages during report interval
@ -237,7 +237,7 @@ class MessageStatsCollector(object):
return stats
class NotifyEndpoint(object):
class NotifyEndpoint:
def __init__(self, wait_before_answer, requeue):
self.wait_before_answer = wait_before_answer
self.requeue = requeue
@ -273,7 +273,7 @@ def notify_server(transport, topic, wait_before_answer, duration, requeue):
return endpoints[0]
class BatchNotifyEndpoint(object):
class BatchNotifyEndpoint:
def __init__(self, wait_before_answer, requeue):
self.wait_before_answer = wait_before_answer
self.requeue = requeue
@ -306,7 +306,7 @@ def batch_notify_server(transport, topic, wait_before_answer, duration,
return endpoints[0]
class RpcEndpoint(object):
class RpcEndpoint:
def __init__(self, wait_before_answer):
self.wait_before_answer = wait_before_answer
self.received_messages = MessageStatsCollector('server')
@ -325,7 +325,7 @@ class RpcEndpoint(object):
return reply
class ServerControlEndpoint(object):
class ServerControlEndpoint:
def __init__(self, controlled_server):
self.connected_clients = set()
self.controlled_server = controlled_server
@ -356,7 +356,7 @@ class ServerControlEndpoint(object):
self.controlled_server.wait()
class Client(object):
class Client:
def __init__(self, client_id, client, method, has_result,
wait_after_msg):
self.client_id = client_id
@ -413,10 +413,9 @@ class RPCClient(Client):
client = rpc.get_rpc_client(transport, target)
method = _rpc_cast if is_cast else _rpc_call
super(RPCClient, self).__init__(client_id,
client.prepare(timeout=timeout),
method,
not is_cast, wait_after_msg)
super().__init__(client_id,
client.prepare(timeout=timeout),
method, not is_cast, wait_after_msg)
self.sync_mode = sync_mode
self.is_sync = False
@ -430,7 +429,7 @@ class RPCClient(Client):
def send_msg(self):
if self.sync_mode and not self.is_sync:
self.is_sync = self.sync_start()
super(RPCClient, self).send_msg()
super().send_msg()
def sync_start(self):
try:
@ -441,7 +440,7 @@ class RPCClient(Client):
LOG.error('The client: %s failed to sync with %s.' %
(self.client_id, self.client.target))
return False
LOG.info('The client: %s successfully sync with %s' % (
LOG.info('The client: {} successfully sync with {}'.format(
self.client_id, self.client.target))
return True
@ -464,8 +463,7 @@ class NotifyClient(Client):
client = notify.Notifier(transport, driver='messaging', topics=topic)
client = client.prepare(publisher_id='publisher-%d' % client_id)
method = _notify
super(NotifyClient, self).__init__(client_id, client, method,
False, wait_after_msg)
super().__init__(client_id, client, method, False, wait_after_msg)
def generate_messages(messages_count):
@ -668,7 +666,7 @@ def write_json_file(filename, output):
class SignalExit(SystemExit):
def __init__(self, signo, exccode=1):
super(SignalExit, self).__init__(exccode)
super().__init__(exccode)
self.signo = signo