Merge "consumer connections not closed properly"
This commit is contained in:
commit
dda4e698b3
@ -461,6 +461,7 @@ class Connection(object):
|
|||||||
# NOTE(sileht): if purpose is PURPOSE_LISTEN
|
# NOTE(sileht): if purpose is PURPOSE_LISTEN
|
||||||
# the consume code does the heartbeat stuff
|
# the consume code does the heartbeat stuff
|
||||||
# we don't need a thread
|
# we don't need a thread
|
||||||
|
self._heartbeat_thread = None
|
||||||
if purpose == rpc_amqp.PURPOSE_SEND:
|
if purpose == rpc_amqp.PURPOSE_SEND:
|
||||||
self._heartbeat_start()
|
self._heartbeat_start()
|
||||||
|
|
||||||
|
@ -261,28 +261,26 @@ class TestRabbitConsume(test_utils.BaseTestCase):
|
|||||||
'kombu+memory:////')
|
'kombu+memory:////')
|
||||||
self.addCleanup(transport.cleanup)
|
self.addCleanup(transport.cleanup)
|
||||||
channel = mock.Mock()
|
channel = mock.Mock()
|
||||||
conn = transport._driver._get_connection(amqp.PURPOSE_LISTEN
|
with transport._driver._get_connection(amqp.PURPOSE_LISTEN) as conn:
|
||||||
).connection
|
conn.connection.connection.recoverable_channel_errors = (IOError,)
|
||||||
conn.connection.recoverable_channel_errors = (IOError,)
|
with mock.patch.object(conn.connection.connection, 'channel',
|
||||||
with mock.patch.object(conn.connection, 'channel',
|
side_effect=[IOError, IOError, channel]):
|
||||||
side_effect=[IOError, IOError, channel]):
|
conn.connection.reset()
|
||||||
conn.reset()
|
self.assertEqual(channel, conn.connection.channel)
|
||||||
self.assertEqual(channel, conn.channel)
|
|
||||||
|
|
||||||
def test_connection_ack_have_disconnected_kombu_connection(self):
|
def test_connection_ack_have_disconnected_kombu_connection(self):
|
||||||
transport = oslo_messaging.get_transport(self.conf,
|
transport = oslo_messaging.get_transport(self.conf,
|
||||||
'kombu+memory:////')
|
'kombu+memory:////')
|
||||||
self.addCleanup(transport.cleanup)
|
self.addCleanup(transport.cleanup)
|
||||||
conn = transport._driver._get_connection(amqp.PURPOSE_LISTEN
|
with transport._driver._get_connection(amqp.PURPOSE_LISTEN) as conn:
|
||||||
).connection
|
channel = conn.connection.channel
|
||||||
channel = conn.channel
|
with mock.patch('kombu.connection.Connection.connected',
|
||||||
with mock.patch('kombu.connection.Connection.connected',
|
new_callable=mock.PropertyMock,
|
||||||
new_callable=mock.PropertyMock,
|
return_value=False):
|
||||||
return_value=False):
|
self.assertRaises(driver_common.Timeout,
|
||||||
self.assertRaises(driver_common.Timeout,
|
conn.connection.consume, timeout=0.01)
|
||||||
conn.consume, timeout=0.01)
|
# Ensure a new channel have been setuped
|
||||||
# Ensure a new channel have been setuped
|
self.assertNotEqual(channel, conn.connection.channel)
|
||||||
self.assertNotEqual(channel, conn.channel)
|
|
||||||
|
|
||||||
|
|
||||||
class TestRabbitTransportURL(test_utils.BaseTestCase):
|
class TestRabbitTransportURL(test_utils.BaseTestCase):
|
||||||
|
Loading…
x
Reference in New Issue
Block a user