diff --git a/tests/unit/test_verifier_db.py b/tests/unit/test_verifier_db.py index da75aed..8b27881 100644 --- a/tests/unit/test_verifier_db.py +++ b/tests/unit/test_verifier_db.py @@ -79,9 +79,45 @@ class VerifierTestCase(unittest.TestCase): models.InstanceExists.objects = self.mox.CreateMockAnything() self.mox.StubOutWithMock(models, 'JsonReport', use_mock_anything=True) models.JsonReport.objects = self.mox.CreateMockAnything() + self._setup_verifier() + + def _setup_verifier(self): + self.config = { + "tick_time": 30, + "settle_time": 5, + "settle_units": "minutes", + "pool_size": 2, + "enable_notifications": False, + } + self.pool = self.mox.CreateMockAnything() + self.verifier = dbverifier.Verifier(self.config, pool=self.pool) + + self.config_notif = { + "tick_time": 30, + "settle_time": 5, + "settle_units": "minutes", + "pool_size": 2, + "enable_notifications": True, + "rabbit": { + "durable_queue": False, + "host": "10.0.0.1", + "port": 5672, + "userid": "rabbit", + "password": "rabbit", + "virtual_host": "/", + "exchange_name": "stacktach", + } + } + self.pool_notif = self.mox.CreateMockAnything() + self.verifier_notif = dbverifier.Verifier(self.config_notif, + pool=self.pool_notif) def tearDown(self): self.mox.UnsetStubs() + self.verifier = None + self.pool = None + self.verifier_notif = None + self.pool_notif = None def test_verify_for_launch(self): exist = self.mox.CreateMockAnything() @@ -574,7 +610,6 @@ class VerifierTestCase(unittest.TestCase): self.mox.VerifyAll() def test_verify_for_range_without_callback(self): - pool = self.mox.CreateMockAnything() when_max = datetime.datetime.utcnow() results = self.mox.CreateMockAnything() models.InstanceExists.objects.select_related().AndReturn(results) @@ -593,10 +628,12 @@ class VerifierTestCase(unittest.TestCase): results.__iter__().AndReturn([exist1, exist2].__iter__()) exist1.save() exist2.save() - pool.apply_async(dbverifier._verify, args=(exist1,), callback=None) - pool.apply_async(dbverifier._verify, args=(exist2,), callback=None) + self.pool.apply_async(dbverifier._verify, args=(exist1,), + callback=None) + self.pool.apply_async(dbverifier._verify, args=(exist2,), + callback=None) self.mox.ReplayAll() - dbverifier.verify_for_range(pool, when_max) + self.verifier.verify_for_range(when_max) self.assertEqual(exist1.status, 'verifying') self.assertEqual(exist2.status, 'verifying') self.mox.VerifyAll() @@ -622,10 +659,12 @@ class VerifierTestCase(unittest.TestCase): results.__iter__().AndReturn([exist1, exist2].__iter__()) exist1.save() exist2.save() - pool.apply_async(dbverifier._verify, args=(exist1,), callback=callback) - pool.apply_async(dbverifier._verify, args=(exist2,), callback=callback) + self.pool.apply_async(dbverifier._verify, args=(exist1,), + callback=callback) + self.pool.apply_async(dbverifier._verify, args=(exist2,), + callback=callback) self.mox.ReplayAll() - dbverifier.verify_for_range(pool, when_max, callback=callback) + self.verifier.verify_for_range(when_max, callback=callback) self.assertEqual(exist1.status, 'verifying') self.assertEqual(exist2.status, 'verifying') self.mox.VerifyAll() @@ -702,140 +741,63 @@ class VerifierTestCase(unittest.TestCase): self.mox.VerifyAll() def test_run_notifications(self): - config = { - "tick_time": 30, - "settle_time": 5, - "settle_units": "minutes", - "pool_size": 2, - "enable_notifications": True, - "rabbit": { - "durable_queue": False, - "host": "10.0.0.1", - "port": 5672, - "userid": "rabbit", - "password": "rabbit", - "virtual_host": "/", - "exchange_name": "stacktach" - } - } - self.mox.StubOutWithMock(multiprocessing, 'Pool') - pool = self.mox.CreateMockAnything() - multiprocessing.Pool(2).AndReturn(pool) self.mox.StubOutWithMock(dbverifier, '_create_exchange') exchange = self.mox.CreateMockAnything() dbverifier._create_exchange('stacktach', 'topic', durable=False)\ .AndReturn(exchange) self.mox.StubOutWithMock(dbverifier, '_create_connection') conn = self.mox.CreateMockAnything() - dbverifier._create_connection(config).AndReturn(conn) + dbverifier._create_connection(self.config_notif).AndReturn(conn) conn.__enter__().AndReturn(conn) - self.mox.StubOutWithMock(dbverifier, '_run') - dbverifier._run(config, pool, callback=mox.IgnoreArg()) + self.mox.StubOutWithMock(self.verifier_notif, '_run') + self.verifier_notif._run(callback=mox.IgnoreArg()) conn.__exit__(None, None, None) self.mox.ReplayAll() - dbverifier.run(config) + self.verifier_notif.run() self.mox.VerifyAll() def test_run_notifications_with_routing_keys(self): - config = { - "tick_time": 30, - "settle_time": 5, - "settle_units": "minutes", - "pool_size": 2, - "enable_notifications": True, - "rabbit": { - "durable_queue": False, - "host": "10.0.0.1", - "port": 5672, - "userid": "rabbit", - "password": "rabbit", - "virtual_host": "/", - "exchange_name": "stacktach", - } - } - self.mox.StubOutWithMock(multiprocessing, 'Pool') - pool = self.mox.CreateMockAnything() - multiprocessing.Pool(2).AndReturn(pool) self.mox.StubOutWithMock(dbverifier, '_create_exchange') exchange = self.mox.CreateMockAnything() dbverifier._create_exchange('stacktach', 'topic', durable=False) \ .AndReturn(exchange) self.mox.StubOutWithMock(dbverifier, '_create_connection') conn = self.mox.CreateMockAnything() - dbverifier._create_connection(config).AndReturn(conn) + dbverifier._create_connection(self.config_notif).AndReturn(conn) conn.__enter__().AndReturn(conn) - self.mox.StubOutWithMock(dbverifier, '_run') - dbverifier._run(config, pool, callback=mox.IgnoreArg()) + self.mox.StubOutWithMock(self.verifier_notif, '_run') + self.verifier_notif._run(callback=mox.IgnoreArg()) conn.__exit__(None, None, None) self.mox.ReplayAll() - dbverifier.run(config) + self.verifier_notif.run() self.mox.VerifyAll() def test_run_no_notifications(self): - config = { - "tick_time": 30, - "settle_time": 5, - "settle_units": "minutes", - "pool_size": 2, - "enable_notifications": False, - } - self.mox.StubOutWithMock(multiprocessing, 'Pool') - pool = self.mox.CreateMockAnything() - multiprocessing.Pool(2).AndReturn(pool) - self.mox.StubOutWithMock(dbverifier, '_run') - dbverifier._run(config, pool) + self.mox.StubOutWithMock(self.verifier, '_run') + self.verifier._run() self.mox.ReplayAll() - dbverifier.run(config) + self.verifier.run() self.mox.VerifyAll() def test_run_once_notifications(self): - config = { - "tick_time": 30, - "settle_time": 5, - "settle_units": "minutes", - "pool_size": 2, - "enable_notifications": True, - "rabbit": { - "durable_queue": False, - "host": "10.0.0.1", - "port": 5672, - "userid": "rabbit", - "password": "rabbit", - "virtual_host": "/", - "exchange_name": "stacktach" - } - } - self.mox.StubOutWithMock(multiprocessing, 'Pool') - pool = self.mox.CreateMockAnything() - multiprocessing.Pool(2).AndReturn(pool) self.mox.StubOutWithMock(dbverifier, '_create_exchange') exchange = self.mox.CreateMockAnything() dbverifier._create_exchange('stacktach', 'topic', durable=False) \ .AndReturn(exchange) self.mox.StubOutWithMock(dbverifier, '_create_connection') conn = self.mox.CreateMockAnything() - dbverifier._create_connection(config).AndReturn(conn) + dbverifier._create_connection(self.config_notif).AndReturn(conn) conn.__enter__().AndReturn(conn) - self.mox.StubOutWithMock(dbverifier, '_run_once') - dbverifier._run_once(config, pool, callback=mox.IgnoreArg()) + self.mox.StubOutWithMock(self.verifier_notif, '_run_once') + self.verifier_notif._run_once(callback=mox.IgnoreArg()) conn.__exit__(None, None, None) self.mox.ReplayAll() - dbverifier.run_once(config) + self.verifier_notif.run_once() self.mox.VerifyAll() def test_run_once_no_notifications(self): - config = { - "tick_time": 30, - "settle_time": 5, - "settle_units": "minutes", - "pool_size": 2, - "enable_notifications": False, - } - self.mox.StubOutWithMock(multiprocessing, 'Pool') - pool = self.mox.CreateMockAnything() - multiprocessing.Pool(2).AndReturn(pool) - self.mox.StubOutWithMock(dbverifier, '_run_once') - dbverifier._run_once(config, pool) + self.mox.StubOutWithMock(self.verifier, '_run_once') + self.verifier._run_once() self.mox.ReplayAll() - dbverifier.run_once(config) + self.verifier.run_once() self.mox.VerifyAll() diff --git a/verifier/dbverifier.py b/verifier/dbverifier.py index 9651d6c..1ed74b2 100644 --- a/verifier/dbverifier.py +++ b/verifier/dbverifier.py @@ -263,6 +263,26 @@ def _verify_with_reconciled_data(exist, ex): delete_type="InstanceReconcile") +def _attempt_reconciled_verify(exist, orig_e): + verified = False + try: + # Attempt to verify against reconciled data + _verify_with_reconciled_data(exist, orig_e) + verified = True + _mark_exist_verified(exist) + except NotFound, rec_e: + # No reconciled data, just mark it failed + _mark_exist_failed(exist, reason=str(orig_e)) + except VerificationException, rec_e: + # Verification failed against reconciled data, mark it failed + # using the second failure. + _mark_exist_failed(exist, reason=str(rec_e)) + except Exception, rec_e: + _mark_exist_failed(exist, reason=rec_e.__class__.__name__) + LOG.exception(rec_e) + return verified + + def _verify(exist): verified = False try: @@ -276,21 +296,7 @@ def _verify(exist): _mark_exist_verified(exist) except VerificationException, orig_e: # Something is wrong with the InstanceUsage record - try: - # Attempt to verify against reconciled data - _verify_with_reconciled_data(exist, orig_e) - verified = True - _mark_exist_verified(exist) - except NotFound, rec_e: - # No reconciled data, just mark it failed - _mark_exist_failed(exist, reason=str(orig_e)) - except VerificationException, rec_e: - # Verification failed against reconciled data, mark it failed - # using the second failure. - _mark_exist_failed(exist, reason=str(rec_e)) - except Exception, rec_e: - _mark_exist_failed(exist, reason=rec_e.__class__.__name__) - LOG.exception(rec_e) + verified = _attempt_reconciled_verify(exist, orig_e) except Exception, e: _mark_exist_failed(exist, reason=e.__class__.__name__) LOG.exception(e) @@ -298,54 +304,6 @@ def _verify(exist): return verified, exist -results = [] - - -def verify_for_range(pool, ending_max, callback=None): - exists = _list_exists(ending_max=ending_max, - status=models.InstanceExists.PENDING) - count = exists.count() - added = 0 - update_interval = datetime.timedelta(seconds=30) - next_update = datetime.datetime.utcnow() + update_interval - LOG.info("Adding %s exists to queue." % count) - while added < count: - for exist in exists[0:1000]: - exist.status = models.InstanceExists.VERIFYING - exist.save() - result = pool.apply_async(_verify, args=(exist,), - callback=callback) - results.append(result) - added += 1 - if datetime.datetime.utcnow() > next_update: - values = ((added,) + clean_results()) - msg = "N: %s, P: %s, S: %s, E: %s" % values - LOG.info(msg) - next_update = datetime.datetime.utcnow() + update_interval - - return count - - -def clean_results(): - global results - - pending = [] - finished = 0 - successful = 0 - - for result in results: - if result.ready(): - finished += 1 - if result.successful(): - successful += 1 - else: - pending.append(result) - - results = pending - errored = finished - successful - return len(results), successful, errored - - def _send_notification(message, routing_key, connection, exchange): with kombu.pools.producers[connection].acquire(block=True) as producer: kombu.common.maybe_declare(exchange, producer.channel) @@ -382,81 +340,122 @@ def _create_connection(config): return kombu.connection.BrokerConnection(**conn_params) -def _run(config, pool, callback=None): - tick_time = config['tick_time'] - settle_units = config['settle_units'] - settle_time = config['settle_time'] - while True: - with transaction.commit_on_success(): - now = datetime.datetime.utcnow() - kwargs = {settle_units: settle_time} - ending_max = now - datetime.timedelta(**kwargs) - new = verify_for_range(pool, ending_max, callback=callback) +class Verifier(object): + def __init__(self, config, pool=None): + self.config = config + self.pool = pool or multiprocessing.Pool(self.config['pool_size']) + self.results = [] - msg = "N: %s, P: %s, S: %s, E: %s" % ((new,) + clean_results()) - LOG.info(msg) - sleep(tick_time) + def clean_results(self): + pending = [] + finished = 0 + successful = 0 + for result in self.results: + if result.ready(): + finished += 1 + if result.successful(): + successful += 1 + else: + pending.append(result) -def run(config): - pool = multiprocessing.Pool(config['pool_size']) + self.results = pending + errored = finished - successful + return len(self.results), successful, errored - if config['enable_notifications']: - exchange = _create_exchange(config['rabbit']['exchange_name'], - 'topic', - durable=config['rabbit']['durable_queue']) - routing_keys = None - if config['rabbit'].get('routing_keys') is not None: - routing_keys = config['rabbit']['routing_keys'] + def verify_for_range(self, ending_max, callback=None): + exists = _list_exists(ending_max=ending_max, + status=models.InstanceExists.PENDING) + count = exists.count() + added = 0 + update_interval = datetime.timedelta(seconds=30) + next_update = datetime.datetime.utcnow() + update_interval + LOG.info("Adding %s exists to queue." % count) + while added < count: + for exist in exists[0:1000]: + exist.status = models.InstanceExists.VERIFYING + exist.save() + result = self.pool.apply_async(_verify, args=(exist,), + callback=callback) + self.results.append(result) + added += 1 + if datetime.datetime.utcnow() > next_update: + values = ((added,) + self.clean_results()) + msg = "N: %s, P: %s, S: %s, E: %s" % values + LOG.info(msg) + next_update = datetime.datetime.utcnow() + update_interval + return count - with _create_connection(config) as conn: - def callback(result): - (verified, exist) = result - if verified: - send_verified_notification(exist, conn, exchange, - routing_keys=routing_keys) + def _run(self, callback=None): + tick_time = self.config['tick_time'] + settle_units = self.config['settle_units'] + settle_time = self.config['settle_time'] + while True: + with transaction.commit_on_success(): + now = datetime.datetime.utcnow() + kwargs = {settle_units: settle_time} + ending_max = now - datetime.timedelta(**kwargs) + new = self.verify_for_range(ending_max, + callback=callback) - _run(config, pool, callback=callback) - else: - _run(config, pool) + values = ((new,) + self.clean_results()) + msg = "N: %s, P: %s, S: %s, E: %s" % values + LOG.info(msg) + sleep(tick_time) + def run(self): + if self.config['enable_notifications']: + exchange = _create_exchange(self.config['rabbit']['exchange_name'], + 'topic', + durable=self.config['rabbit']['durable_queue']) + routing_keys = None + if self.config['rabbit'].get('routing_keys') is not None: + routing_keys = self.config['rabbit']['routing_keys'] -def _run_once(config, pool, callback=None): - tick_time = config['tick_time'] - settle_units = config['settle_units'] - settle_time = config['settle_time'] - now = datetime.datetime.utcnow() - kwargs = {settle_units: settle_time} - ending_max = now - datetime.timedelta(**kwargs) - new = verify_for_range(pool, ending_max, callback=callback) + with _create_connection(self.config) as conn: + def callback(result): + (verified, exist) = result + if verified: + send_verified_notification(exist, conn, exchange, + routing_keys=routing_keys) - LOG.info("Verifying %s exist events" % new) - while len(results) > 0: - LOG.info("P: %s, F: %s, E: %s" % clean_results()) - sleep(tick_time) + self._run(callback=callback) + else: + self._run() + def _run_once(self, callback=None): + tick_time = self.config['tick_time'] + settle_units = self.config['settle_units'] + settle_time = self.config['settle_time'] + now = datetime.datetime.utcnow() + kwargs = {settle_units: settle_time} + ending_max = now - datetime.timedelta(**kwargs) + new = self.verify_for_range(ending_max, callback=callback) -def run_once(config): - pool = multiprocessing.Pool(config['pool_size']) + LOG.info("Verifying %s exist events" % new) + while len(self.results) > 0: + LOG.info("P: %s, F: %s, E: %s" % self.clean_results()) + sleep(tick_time) - if config['enable_notifications']: - exchange = _create_exchange(config['rabbit']['exchange_name'], - 'topic', - durable=config['rabbit']['durable_queue']) - routing_keys = None - if config['rabbit'].get('routing_keys') is not None: - routing_keys = config['rabbit']['routing_keys'] + def run_once(self): + if self.config['enable_notifications']: + exchange = _create_exchange(self.config['rabbit']['exchange_name'], + 'topic', + durable=self.config['rabbit']['durable_queue']) + routing_keys = None + if self.config['rabbit'].get('routing_keys') is not None: + routing_keys = self.config['rabbit']['routing_keys'] - with _create_connection(config) as conn: - def callback(result): - (verified, exist) = result - if verified: - send_verified_notification(exist, conn, exchange, - routing_keys=routing_keys) + with _create_connection(self.config) as conn: + def callback(result): + (verified, exist) = result + if verified: + send_verified_notification(exist, conn, exchange, + routing_keys=routing_keys) - _run_once(config, pool, callback=callback) - else: - _run_once(config, pool) + self._run_once(callback=callback) + else: + self._run_once() if __name__ == '__main__': @@ -486,7 +485,8 @@ if __name__ == '__main__': config = {'tick_time': args.tick_time, 'settle_time': args.settle_time, 'settle_units': args.settle_units, 'pool_size': args.pool_size} + verifier = Verifier(config) if args.run_once: - run_once(config) + verifier.run_once() else: - run(config) + verifier.run() diff --git a/verifier/start_verifier.py b/verifier/start_verifier.py index 625497c..aab3a29 100644 --- a/verifier/start_verifier.py +++ b/verifier/start_verifier.py @@ -59,7 +59,13 @@ if __name__ == '__main__': with open(config_filename, "r") as f: config = json.load(f) - process = Process(target=dbverifier.run, args=(config, )) + def make_and_start_verifier(config): + # Gotta create it and run it this way so things don't get + # lost when the process is forked. + verifier = dbverifier.Verifier(config) + verifier.run() + + process = Process(target=make_and_start_verifier, args=(config,)) process.start() signal.signal(signal.SIGINT, kill_time) signal.signal(signal.SIGTERM, kill_time)