diff --git a/tests/api/v1/test_get_query_ts.py b/tests/api/v1/test_get_query_ts.py index 1e72d598f..0d63123ba 100644 --- a/tests/api/v1/test_get_query_ts.py +++ b/tests/api/v1/test_get_query_ts.py @@ -21,57 +21,61 @@ import datetime from ceilometer.api.v1 import blueprint +from ceilometer.tests import base -def test_get_query_timestamps_none_specified(): - result = blueprint._get_query_timestamps() - expected = {'start_timestamp': None, - 'end_timestamp': None, - 'query_start': None, - 'query_end': None, - 'search_offset': 0, +class TestQueryTimestamps(base.TestCase): + + def test_get_query_timestamps_none_specified(self): + result = blueprint._get_query_timestamps() + expected = {'start_timestamp': None, + 'end_timestamp': None, + 'query_start': None, + 'query_end': None, + 'search_offset': 0, + } + self.assertEqual(result, expected) + + def test_get_query_timestamps_start(self): + args = {'start_timestamp': '2012-09-20T12:13:14'} + result = blueprint._get_query_timestamps(args) + expected = {'start_timestamp': datetime.datetime(2012, 9, 20, + 12, 13, 14), + 'end_timestamp': None, + 'query_start': datetime.datetime(2012, 9, 20, + 12, 13, 14), + 'query_end': None, + 'search_offset': 0, + } + self.assertEqual(result, expected) + + def test_get_query_timestamps_end(self): + args = {'end_timestamp': '2012-09-20T12:13:14'} + result = blueprint._get_query_timestamps(args) + expected = {'end_timestamp': datetime.datetime(2012, 9, 20, + 12, 13, 14), + 'start_timestamp': None, + 'query_end': datetime.datetime(2012, 9, 20, + 12, 13, 14), + 'query_start': None, + 'search_offset': 0, + } + self.assertEqual(result, expected) + + def test_get_query_timestamps_with_offset(self): + args = {'start_timestamp': '2012-09-20T12:13:14', + 'end_timestamp': '2012-09-20T13:24:25', + 'search_offset': '20', } - - assert result == expected - - -def test_get_query_timestamps_start(): - args = {'start_timestamp': '2012-09-20T12:13:14'} - result = blueprint._get_query_timestamps(args) - expected = {'start_timestamp': datetime.datetime(2012, 9, 20, 12, 13, 14), - 'end_timestamp': None, - 'query_start': datetime.datetime(2012, 9, 20, 12, 13, 14), - 'query_end': None, - 'search_offset': 0, - } - - assert result == expected - - -def test_get_query_timestamps_end(): - args = {'end_timestamp': '2012-09-20T12:13:14'} - result = blueprint._get_query_timestamps(args) - expected = {'end_timestamp': datetime.datetime(2012, 9, 20, 12, 13, 14), - 'start_timestamp': None, - 'query_end': datetime.datetime(2012, 9, 20, 12, 13, 14), - 'query_start': None, - 'search_offset': 0, - } - - assert result == expected - - -def test_get_query_timestamps_with_offset(): - args = {'start_timestamp': '2012-09-20T12:13:14', - 'end_timestamp': '2012-09-20T13:24:25', - 'search_offset': '20', - } - result = blueprint._get_query_timestamps(args) - expected = {'query_end': datetime.datetime(2012, 9, 20, 13, 44, 25), - 'query_start': datetime.datetime(2012, 9, 20, 11, 53, 14), - 'end_timestamp': datetime.datetime(2012, 9, 20, 13, 24, 25), - 'start_timestamp': datetime.datetime(2012, 9, 20, 12, 13, 14), - 'search_offset': 20, - } - - assert result == expected + result = blueprint._get_query_timestamps(args) + expected = {'query_end': datetime.datetime(2012, 9, 20, + 13, 44, 25), + 'query_start': datetime.datetime(2012, 9, 20, + 11, 53, 14), + 'end_timestamp': datetime.datetime(2012, 9, 20, + 13, 24, 25), + 'start_timestamp': datetime.datetime(2012, 9, 20, + 12, 13, 14), + 'search_offset': 20, + } + self.assertEqual(result, expected) diff --git a/tests/central/test_manager.py b/tests/central/test_manager.py index e0c314473..520439410 100644 --- a/tests/central/test_manager.py +++ b/tests/central/test_manager.py @@ -19,17 +19,19 @@ """ import mock - -from ceilometer.central import manager from keystoneclient.v2_0 import client as ksclient +from ceilometer.central import manager +from ceilometer.tests import base from tests import agentbase -@mock.patch('ceilometer.pipeline.setup_pipeline', mock.MagicMock()) -def test_load_plugins(): - mgr = manager.AgentManager() - assert list(mgr.pollster_manager), 'Failed to load any plugins' +class TestManager(base.TestCase): + + @mock.patch('ceilometer.pipeline.setup_pipeline', mock.MagicMock()) + def test_load_plugins(self): + mgr = manager.AgentManager() + self.assertIsNotNone(list(mgr.pollster_manager)) class TestRunTasks(agentbase.BaseAgentManagerTestCase): diff --git a/tests/compute/test_manager.py b/tests/compute/test_manager.py index 13878ce36..6287422af 100644 --- a/tests/compute/test_manager.py +++ b/tests/compute/test_manager.py @@ -21,14 +21,16 @@ import mock from ceilometer import nova_client from ceilometer.compute import manager - +from ceilometer.tests import base from tests import agentbase -@mock.patch('ceilometer.pipeline.setup_pipeline', mock.MagicMock()) -def test_load_plugins(): - mgr = manager.AgentManager() - assert list(mgr.pollster_manager), 'Failed to load any plugins' +class TestManager(base.TestCase): + + @mock.patch('ceilometer.pipeline.setup_pipeline', mock.MagicMock()) + def test_load_plugins(self): + mgr = manager.AgentManager() + self.assertIsNotNone(list(mgr.pollster_manager)) class TestRunTasks(agentbase.BaseAgentManagerTestCase): diff --git a/tests/publisher/test_rpc_publisher.py b/tests/publisher/test_rpc_publisher.py index 7ddd32067..828155260 100644 --- a/tests/publisher/test_rpc_publisher.py +++ b/tests/publisher/test_rpc_publisher.py @@ -19,7 +19,6 @@ """ import datetime - from oslo.config import cfg from ceilometer import counter @@ -29,163 +28,117 @@ from ceilometer.publisher import rpc from ceilometer.tests import base -def test_compute_signature_change_key(): - sig1 = rpc.compute_signature({'a': 'A', 'b': 'B'}, - 'not-so-secret') - sig2 = rpc.compute_signature({'A': 'A', 'b': 'B'}, - 'not-so-secret') - assert sig1 != sig2 +class TestSignature(base.TestCase): + def test_compute_signature_change_key(self): + sig1 = rpc.compute_signature({'a': 'A', 'b': 'B'}, + 'not-so-secret') + sig2 = rpc.compute_signature({'A': 'A', 'b': 'B'}, + 'not-so-secret') + self.assertNotEqual(sig1, sig2) -def test_compute_signature_change_value(): - sig1 = rpc.compute_signature({'a': 'A', 'b': 'B'}, - 'not-so-secret') - sig2 = rpc.compute_signature({'a': 'a', 'b': 'B'}, - 'not-so-secret') - assert sig1 != sig2 + def test_compute_signature_change_value(self): + sig1 = rpc.compute_signature({'a': 'A', 'b': 'B'}, + 'not-so-secret') + sig2 = rpc.compute_signature({'a': 'a', 'b': 'B'}, + 'not-so-secret') + self.assertNotEqual(sig1, sig2) + def test_compute_signature_same(self): + sig1 = rpc.compute_signature({'a': 'A', 'b': 'B'}, + 'not-so-secret') + sig2 = rpc.compute_signature({'a': 'A', 'b': 'B'}, + 'not-so-secret') + self.assertEqual(sig1, sig2) -def test_compute_signature_same(): - sig1 = rpc.compute_signature({'a': 'A', 'b': 'B'}, - 'not-so-secret') - sig2 = rpc.compute_signature({'a': 'A', 'b': 'B'}, - 'not-so-secret') - assert sig1 == sig2 + def test_compute_signature_signed(self): + data = {'a': 'A', 'b': 'B'} + sig1 = rpc.compute_signature(data, 'not-so-secret') + data['message_signature'] = sig1 + sig2 = rpc.compute_signature(data, 'not-so-secret') + self.assertEqual(sig1, sig2) + def test_compute_signature_use_configured_secret(self): + data = {'a': 'A', 'b': 'B'} + sig1 = rpc.compute_signature(data, 'not-so-secret') + sig2 = rpc.compute_signature(data, 'different-value') + self.assertNotEqual(sig1, sig2) -def test_compute_signature_signed(): - data = {'a': 'A', 'b': 'B'} - sig1 = rpc.compute_signature(data, 'not-so-secret') - data['message_signature'] = sig1 - sig2 = rpc.compute_signature(data, 'not-so-secret') - assert sig1 == sig2 + def test_verify_signature_signed(self): + data = {'a': 'A', 'b': 'B'} + sig1 = rpc.compute_signature(data, 'not-so-secret') + data['message_signature'] = sig1 + self.assertTrue(rpc.verify_signature(data, 'not-so-secret')) + def test_verify_signature_unsigned(self): + data = {'a': 'A', 'b': 'B'} + self.assertFalse(rpc.verify_signature(data, 'not-so-secret')) -def test_compute_signature_use_configured_secret(): - data = {'a': 'A', 'b': 'B'} - sig1 = rpc.compute_signature(data, 'not-so-secret') - sig2 = rpc.compute_signature(data, 'different-value') - assert sig1 != sig2 + def test_verify_signature_incorrect(self): + data = {'a': 'A', 'b': 'B', + 'message_signature': 'Not the same'} + self.assertFalse(rpc.verify_signature(data, 'not-so-secret')) - -def test_verify_signature_signed(): - data = {'a': 'A', 'b': 'B'} - sig1 = rpc.compute_signature(data, 'not-so-secret') - data['message_signature'] = sig1 - assert rpc.verify_signature(data, 'not-so-secret') - - -def test_verify_signature_unsigned(): - data = {'a': 'A', 'b': 'B'} - assert not rpc.verify_signature(data, 'not-so-secret') - - -def test_verify_signature_incorrect(): - data = {'a': 'A', 'b': 'B', - 'message_signature': 'Not the same'} - assert not rpc.verify_signature(data, 'not-so-secret') - - -def test_verify_signature_nested(): - data = {'a': 'A', - 'b': 'B', - 'nested': {'a': 'A', - 'b': 'B', - }, - } - data['message_signature'] = rpc.compute_signature( - data, - 'not-so-secret') - assert rpc.verify_signature(data, 'not-so-secret') - - -def test_verify_signature_nested_json(): - data = {'a': 'A', - 'b': 'B', - 'nested': {'a': 'A', - 'b': 'B', - 'c': ('c',), - 'd': ['d'] - }, - } - data['message_signature'] = rpc.compute_signature( - data, - 'not-so-secret') - jsondata = jsonutils.loads(jsonutils.dumps(data)) - assert rpc.verify_signature(jsondata, 'not-so-secret') - - -TEST_COUNTER = counter.Counter(name='name', - type='typ', - unit='', - volume=1, - user_id='user', - project_id='project', - resource_id=2, - timestamp='today', - resource_metadata={'key': 'value'}, - ) - -TEST_NOTICE = { - u'_context_auth_token': u'3d8b13de1b7d499587dfc69b77dc09c2', - u'_context_is_admin': True, - u'_context_project_id': u'7c150a59fe714e6f9263774af9688f0e', - u'_context_quota_class': None, - u'_context_read_deleted': u'no', - u'_context_remote_address': u'10.0.2.15', - u'_context_request_id': u'req-d68b36e0-9233-467f-9afb-d81435d64d66', - u'_context_roles': [u'admin'], - u'_context_timestamp': u'2012-05-08T20:23:41.425105', - u'_context_user_id': u'1e3ce043029547f1a61c1996d1a531a2', - u'event_type': u'compute.instance.create.end', - u'message_id': u'dae6f69c-00e0-41c0-b371-41ec3b7f4451', - u'payload': {u'created_at': u'2012-05-08 20:23:41', - u'deleted_at': u'', - u'disk_gb': 0, - u'display_name': u'testme', - u'fixed_ips': [{u'address': u'10.0.0.2', - u'floating_ips': [], - u'meta': {}, - u'type': u'fixed', - u'version': 4}], - u'image_ref_url': u'http://10.0.2.15:9292/images/UUID', - u'instance_id': u'9f9d01b9-4a58-4271-9e27-398b21ab20d1', - u'instance_type': u'm1.tiny', - u'instance_type_id': 2, - u'launched_at': u'2012-05-08 20:23:47.985999', - u'memory_mb': 512, - u'state': u'active', - u'state_description': u'', - u'tenant_id': u'7c150a59fe714e6f9263774af9688f0e', - u'user_id': u'1e3ce043029547f1a61c1996d1a531a2'}, - u'priority': u'INFO', - u'publisher_id': u'compute.vagrant-precise', - u'timestamp': u'2012-05-08 20:23:48.028195', -} - - -def test_meter_message_from_counter_signed(): - msg = rpc.meter_message_from_counter( - TEST_COUNTER, - 'not-so-secret', - 'src') - assert 'message_signature' in msg - - -def test_meter_message_from_counter_field(): - def compare(f, c, msg_f, msg): - assert msg == c - msg = rpc.meter_message_from_counter( - TEST_COUNTER, 'not-so-secret', - 'src') - name_map = {'name': 'counter_name', - 'type': 'counter_type', - 'unit': 'counter_unit', - 'volume': 'counter_volume', + def test_verify_signature_nested(self): + data = {'a': 'A', + 'b': 'B', + 'nested': {'a': 'A', + 'b': 'B', + }, } - for f in TEST_COUNTER._fields: - msg_f = name_map.get(f, f) - yield compare, f, getattr(TEST_COUNTER, f), msg_f, msg[msg_f] + data['message_signature'] = rpc.compute_signature( + data, + 'not-so-secret') + self.assertTrue(rpc.verify_signature(data, 'not-so-secret')) + + def test_verify_signature_nested_json(self): + data = {'a': 'A', + 'b': 'B', + 'nested': {'a': 'A', + 'b': 'B', + 'c': ('c',), + 'd': ['d'] + }, + } + data['message_signature'] = rpc.compute_signature( + data, + 'not-so-secret') + jsondata = jsonutils.loads(jsonutils.dumps(data)) + self.assertTrue(rpc.verify_signature(jsondata, 'not-so-secret')) + + +class TestCounter(base.TestCase): + + TEST_COUNTER = counter.Counter(name='name', + type='typ', + unit='', + volume=1, + user_id='user', + project_id='project', + resource_id=2, + timestamp='today', + resource_metadata={'key': 'value'}, + ) + + def test_meter_message_from_counter_signed(self): + msg = rpc.meter_message_from_counter(self.TEST_COUNTER, + 'not-so-secret', + 'src') + self.assertIn('message_signature', msg) + + def test_meter_message_from_counter_field(self): + def compare(f, c, msg_f, msg): + self.assertEqual(msg, c) + msg = rpc.meter_message_from_counter(self.TEST_COUNTER, + 'not-so-secret', + 'src') + name_map = {'name': 'counter_name', + 'type': 'counter_type', + 'unit': 'counter_unit', + 'volume': 'counter_volume'} + for f in self.TEST_COUNTER._fields: + msg_f = name_map.get(f, f) + yield compare, f, getattr(self.TEST_COUNTER, f), msg_f, msg[msg_f] class TestPublish(base.TestCase): diff --git a/tests/storage/test_get_engine.py b/tests/storage/test_get_engine.py index 0ce2bd180..498b6f4de 100644 --- a/tests/storage/test_get_engine.py +++ b/tests/storage/test_get_engine.py @@ -19,22 +19,26 @@ """ import mox +import testtools from ceilometer import storage from ceilometer.storage import impl_log -def test_get_engine(): - conf = mox.Mox().CreateMockAnything() - conf.database.connection = 'log://localhost' - engine = storage.get_engine(conf) - assert isinstance(engine, impl_log.LogStorage) +class EngineTest(testtools.TestCase): + def test_get_engine(self): + conf = mox.Mox().CreateMockAnything() + conf.database = mox.Mox().CreateMockAnything() + conf.database.connection = 'log://localhost' + engine = storage.get_engine(conf) + self.assertIsInstance(engine, impl_log.LogStorage) -def test_get_engine_no_such_engine(): - conf = mox.Mox().CreateMockAnything() - conf.database.connection = 'no-such-engine://localhost' - try: - storage.get_engine(conf) - except RuntimeError as err: - assert 'no-such-engine' in unicode(err) + def test_get_engine_no_such_engine(self): + conf = mox.Mox().CreateMockAnything() + conf.database = mox.Mox().CreateMockAnything() + conf.database.connection = 'no-such-engine://localhost' + try: + storage.get_engine(conf) + except RuntimeError as err: + self.assertIn('no-such-engine', unicode(err)) diff --git a/tests/storage/test_impl_log.py b/tests/storage/test_impl_log.py index 100c66eac..41608366c 100644 --- a/tests/storage/test_impl_log.py +++ b/tests/storage/test_impl_log.py @@ -17,17 +17,17 @@ # under the License. """Tests for ceilometer/storage/impl_log.py """ - -import mox - +from ceilometer.tests import base from ceilometer.storage import impl_log -def test_get_connection(): - conf = mox.Mox().CreateMockAnything() - log_stg = impl_log.LogStorage() - conn = log_stg.get_connection(conf) - conn.record_metering_data({'counter_name': 'test', - 'resource_id': __name__, - 'counter_volume': 1, - }) +class ConnectionTest(base.TestCase): + + def test_get_connection(self): + conf = self.mox.CreateMockAnything() + log_stg = impl_log.LogStorage() + conn = log_stg.get_connection(conf) + conn.record_metering_data({'counter_name': 'test', + 'resource_id': __name__, + 'counter_volume': 1, + }) diff --git a/tests/storage/test_impl_sqlalchemy.py b/tests/storage/test_impl_sqlalchemy.py index 027490def..ddf6accc3 100644 --- a/tests/storage/test_impl_sqlalchemy.py +++ b/tests/storage/test_impl_sqlalchemy.py @@ -22,11 +22,7 @@ the tests. """ - -from oslo.config import cfg - from ceilometer.storage.sqlalchemy.models import table_args - from tests.storage import base @@ -99,6 +95,8 @@ class GetEventTest(base.GetEventTest, EventTestBase): pass -def test_model_table_args(): - cfg.CONF.database.connection = 'mysql://localhost' - assert table_args() +class ModelTest(SQLAlchemyEngineTestBase): + database_connection = 'mysql://localhost' + + def test_model_table_args(self): + self.assertIsNotNone(table_args()) diff --git a/tests/test_tools_notificationclient.py b/tests/test_tools_notificationclient.py deleted file mode 100644 index a164698a7..000000000 --- a/tests/test_tools_notificationclient.py +++ /dev/null @@ -1,38 +0,0 @@ -import os -import cPickle as pickle -from StringIO import StringIO -import sys -import types - -import mox - -from ceilometer.openstack.common.rpc import impl_kombu - -# The module being tested is part of the tools directory, -# so make sure it is in our import path. -sys.path.insert(0, os.path.normpath(os.path.join(os.path.dirname(__file__), - '..', 'tools'))) -import notificationclient - - -def test_send_messages(): - message = {'timestamp': 'date goes here', - 'event_type': 'compute.instance.exists', - # real messages have more fields... - } - input = StringIO(pickle.dumps(message)) - conn = mox.MockObject(impl_kombu.Connection) - conn.topic_send('notifications.info', message) - mox.Replay(conn) - notificationclient.send_messages(conn, 'notifications.info', input) - mox.Verify(conn) - - -def test_record_messages(): - conn = mox.MockObject(impl_kombu.Connection) - conn.declare_topic_consumer('notifications.info', - mox.IsA(types.FunctionType)) - conn.consume() - mox.Replay(conn) - notificationclient.record_messages(conn, 'notifications.info', StringIO()) - mox.Verify(conn) diff --git a/tests/test_utils.py b/tests/test_utils.py index 4b369d443..f551c2e24 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -26,22 +26,8 @@ from ceilometer.tests import base as tests_base from ceilometer import utils -def test_recursive_keypairs(): - data = {'a': 'A', - 'b': 'B', - 'nested': {'a': 'A', - 'b': 'B', - }, - } - pairs = list(utils.recursive_keypairs(data)) - assert pairs == [('a', 'A'), - ('b', 'B'), - ('nested:a', 'A'), - ('nested:b', 'B'), - ] - - class TestUtils(tests_base.TestCase): + def test_datetime_to_decimal(self): expected = 1356093296.12 utc_datetime = datetime.datetime.utcfromtimestamp(expected) @@ -54,3 +40,16 @@ class TestUtils(tests_base.TestCase): expected_datetime = datetime.datetime.utcfromtimestamp(expected) actual_datetime = utils.decimal_to_dt(dexpected) self.assertEqual(actual_datetime, expected_datetime) + + def test_recursive_keypairs(self): + data = {'a': 'A', + 'b': 'B', + 'nested': {'a': 'A', + 'b': 'B', + }, + } + pairs = list(utils.recursive_keypairs(data)) + self.assertEqual(pairs, [('a', 'A'), + ('b', 'B'), + ('nested:a', 'A'), + ('nested:b', 'B')]) diff --git a/tools/notificationclient.py b/tools/notificationclient.py deleted file mode 100755 index 33dbcd7b4..000000000 --- a/tools/notificationclient.py +++ /dev/null @@ -1,151 +0,0 @@ -#!/usr/bin/env python -# -*- encoding: utf-8 -*- -# -# Copyright © 2012 New Dream Network, LLC (DreamHost) -# -# Author: Doug Hellmann -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -"""Command line tool for recording notification messages and replaying -them later. -""" - -import argparse -import logging -import cPickle as pickle -import sys - -#from nova import utils -from oslo.config import cfg - -from ceilometer.openstack.common import rpc - -LOG = logging.getLogger(__name__) - - -def record_messages(connection, topic, output): - """Listen to notification.info messages and pickle them to output.""" - def process_event(body): - print ('%s: %s' % - (body.get('timestamp'), - body.get('event_type', 'unknown event'), - )) - pickle.dump(body, output) - - connection.declare_topic_consumer(topic, process_event) - try: - connection.consume() - except KeyboardInterrupt: - pass - - -def monitor_messages(connection, topic): - """Listen to notification.info messages and print them.""" - def process_event(msg): - body = msg['args']['data'] - if 'resource_id' in body: - print ('%s: %s/%-15s: %s' % - (body.get('timestamp'), - body.get('resource_id'), - body.get('event_type'), - body.get('counter_volume'), - )) - else: - print ('%s: %s' % - (body.get('timestamp'), - body.get('event_type'), - )) - - connection.declare_topic_consumer(topic, process_event) - try: - connection.consume() - except KeyboardInterrupt: - pass - - -def send_messages(connection, topic, input): - """Read messages from the input and send them to the AMQP queue.""" - while True: - try: - body = pickle.load(input) - except EOFError: - break - print('%s: %s' % - (body.get('timestamp'), - body.get('event_type', 'unknown event'), - )) - connection.topic_send(topic, body) - - -def main(): - rpc.register_opts(cfg.CONF) - cfg.CONF.register_opts([ - cfg.StrOpt('datafile', - default=None, - help='Data file to read or write', - ), - cfg.BoolOpt('record', - help='Record events', - ), - cfg.BoolOpt('replay', - help='Replay events', - ), - ]) - - remaining_args = cfg.CONF(sys.argv) - #utils.monkey_patch() - - parser = argparse.ArgumentParser( - description='record or play back notification events', - ) - parser.add_argument('mode', - choices=('record', 'replay', 'monitor'), - help='operating mode', - ) - parser.add_argument('data_file', - default='msgs.dat', - nargs='?', - help='the data file to read or write', - ) - parser.add_argument('--topic', - default='notifications.info', - help='the exchange topic to listen for', - ) - args = parser.parse_args(remaining_args[1:]) - - console = logging.StreamHandler(sys.stderr) - console.setLevel(logging.DEBUG) - formatter = logging.Formatter('%(message)s') - console.setFormatter(formatter) - root_logger = logging.getLogger('') - root_logger.addHandler(console) - root_logger.setLevel(logging.DEBUG) - - connection = rpc.create_connection() - try: - if args.mode == 'replay': - with open(args.data_file, 'rb') as input: - send_messages(connection, args.topic, input) - elif args.mode == 'record': - with open(args.data_file, 'wb') as output: - record_messages(connection, args.topic, output) - elif args.mode == 'monitor': - monitor_messages(connection, args.topic) - finally: - connection.close() - - return 0 - -if __name__ == '__main__': - main()