diff --git a/stacktach/models.py b/stacktach/models.py index ee1b187..58f60d3 100644 --- a/stacktach/models.py +++ b/stacktach/models.py @@ -563,11 +563,13 @@ class ImageExists(models.Model): self.status = new_status @staticmethod - def find_and_group_by_owner_and_raw_id(ending_max, status): + def find_and_group_by_owner_and_raw_id(ending_max, status, batchsize=None): params = {'audit_period_ending__lte': dt.dt_to_decimal(ending_max), 'status': status} ordered_exists = ImageExists.objects.select_related().\ filter(**params).order_by('owner') + if batchsize: + ordered_exists = ordered_exists[:batchsize] result = {} for exist in ordered_exists: key = "%s-%s" % (exist.owner, exist.raw_id) diff --git a/tests/unit/test_glance_verifier.py b/tests/unit/test_glance_verifier.py index 1bb258d..5733ab9 100644 --- a/tests/unit/test_glance_verifier.py +++ b/tests/unit/test_glance_verifier.py @@ -526,9 +526,11 @@ class GlanceVerifierTestCase(StacktachBaseTestCase): results = {'owner1': [exist1, exist2], 'owner2': [exist3]} sent_results = {'owner1': [exist4], 'owner2': [exist5]} models.ImageExists.find_and_group_by_owner_and_raw_id( + batchsize=1000, ending_max=when_max, status=models.ImageExists.SENT_UNVERIFIED).AndReturn(sent_results) models.ImageExists.find_and_group_by_owner_and_raw_id( + batchsize=1000, ending_max=when_max, status=models.ImageExists.PENDING).AndReturn(results) exist1.save() @@ -569,9 +571,11 @@ class GlanceVerifierTestCase(StacktachBaseTestCase): exist3 = self.mox.CreateMockAnything() results = {'owner1': [exist1, exist2], 'owner2': [exist3]} models.ImageExists.find_and_group_by_owner_and_raw_id( + batchsize=1000, ending_max=when_max, - status=models.ImageExists.SENT_UNVERIFIED).AndReturn([]) + status=models.ImageExists.SENT_UNVERIFIED).AndReturn({}) models.ImageExists.find_and_group_by_owner_and_raw_id( + batchsize=1000, ending_max=when_max, status=models.ImageExists.PENDING).AndReturn(results) exist1.save() diff --git a/tests/unit/test_nova_verifier.py b/tests/unit/test_nova_verifier.py index 636c0f0..a81e280 100644 --- a/tests/unit/test_nova_verifier.py +++ b/tests/unit/test_nova_verifier.py @@ -860,11 +860,13 @@ class NovaVerifierVerifyTestCase(StacktachBaseTestCase): ending_max=when_max, status='sent_unverified').AndReturn(sent_results) models.InstanceExists.find( ending_max=when_max, status='pending').AndReturn(results) + sent_results.__getslice__(0, 1000).AndReturn(sent_results) + results.__getslice__(0, 1000).AndReturn(results) sent_results.count().AndReturn(0) results.count().AndReturn(2) exist1 = self.mox.CreateMockAnything() exist2 = self.mox.CreateMockAnything() - results.__getslice__(0, 1000).AndReturn(results) + sent_results.__iter__().AndReturn([].__iter__()) results.__iter__().AndReturn([exist1, exist2].__iter__()) exist1.update_status('verifying') exist2.update_status('verifying') @@ -896,11 +898,13 @@ class NovaVerifierVerifyTestCase(StacktachBaseTestCase): ending_max=when_max, status='sent_unverified').AndReturn(sent_results) models.InstanceExists.find( ending_max=when_max, status='pending').AndReturn(results) + sent_results.__getslice__(0, 1000).AndReturn(sent_results) + results.__getslice__(0, 1000).AndReturn(results) sent_results.count().AndReturn(0) results.count().AndReturn(2) exist1 = self.mox.CreateMockAnything() exist2 = self.mox.CreateMockAnything() - results.__getslice__(0, 1000).AndReturn(results) + sent_results.__iter__().AndReturn([].__iter__()) results.__iter__().AndReturn([exist1, exist2].__iter__()) exist1.update_status('verifying') exist2.update_status('verifying') @@ -928,12 +932,14 @@ class NovaVerifierVerifyTestCase(StacktachBaseTestCase): ending_max=when_max, status='sent_unverified').AndReturn(sent_results) models.InstanceExists.find( ending_max=when_max, status='pending').AndReturn(results) + sent_results.__getslice__(0, 1000).AndReturn(sent_results) + results.__getslice__(0, 1000).AndReturn(results) sent_results.count().AndReturn(2) results.count().AndReturn(0) exist1 = self.mox.CreateMockAnything() exist2 = self.mox.CreateMockAnything() - sent_results.__getslice__(0, 1000).AndReturn(sent_results) sent_results.__iter__().AndReturn([exist1, exist2].__iter__()) + results.__iter__().AndReturn([].__iter__()) exist1.update_status('sent_verifying') exist2.update_status('sent_verifying') exist1.save() diff --git a/tests/unit/utils.py b/tests/unit/utils.py index f73593c..d72005d 100644 --- a/tests/unit/utils.py +++ b/tests/unit/utils.py @@ -193,7 +193,7 @@ class FakeVerifierConfig(object): self.nova_event_type = lambda: nova_event_type self.glance_event_type = lambda: glance_event_type self.flavor_field_name = lambda: flavor_field_name - + self.batchsize = lambda: 1000 def make_verifier_config(notifs): topics = {'exchange': ['notifications.info']} diff --git a/verifier/base_verifier.py b/verifier/base_verifier.py index d3e453d..30424f4 100644 --- a/verifier/base_verifier.py +++ b/verifier/base_verifier.py @@ -18,6 +18,7 @@ import datetime import decimal import os import re +import signal import sys import time import multiprocessing @@ -108,6 +109,7 @@ def _is_alphanumeric(attr_name, attr_value, exist_id, instance_uuid): class Verifier(object): + def __init__(self, config, pool=None, reconciler=None, stats=None): self.config = config self.pool = pool or multiprocessing.Pool(config.pool_size()) @@ -115,10 +117,14 @@ class Verifier(object): self.reconciler = reconciler self.results = [] self.failed = [] + self.batchsize = config.batchsize() if stats is None: self.stats = {} else: self.stats = stats + self.update_interval = datetime.timedelta(seconds=30) + self.next_update = datetime.datetime.utcnow() + self.update_interval + self._do_run = True def clean_results(self): pending = [] @@ -140,8 +146,39 @@ class Verifier(object): errored = finished - successful return len(self.results), successful, errored + def check_results(self, new_added, force=False): + tick_time = self.config.tick_time() + if ((datetime.datetime.utcnow() > self.next_update) + or force or (len(self.results) > self.batchsize)): + values = ((self.exchange(), new_added,) + self.clean_results()) + msg = "%s: N: %s, P: %s, S: %s, E: %s" % values + _get_child_logger().info(msg) + while len(self.results) > (self.batchsize * 0.75): + msg = "%s: Waiting on event processing. Pending: %s" % ( + self.exchange(), len(self.results)) + _get_child_logger().info(msg) + time.sleep(tick_time) + self.clean_results() + self.next_update = datetime.datetime.utcnow() + self.update_interval + + def handle_signal(self, signal_number): + log = _get_child_logger() + if signal_number in (signal.SIGTERM, signal.SIGKILL): + self._do_run = False + log.info("%s verifier cleaning up for shutdown." % self.exchange()) + if signal_number == signal.SIGUSR1: + info = """ + %s verifier: + PID: %s Parent PID: + Last watchdog check: %s + # of items processed: %s + """ % (self.exchange(), os.getpid(), os.getppid(), + self.stats['timestamp'], + self.stats.get('total_processed',0)) + log.info(info) + def _keep_running(self): - return True + return self._do_run def _utcnow(self): return datetime.datetime.utcnow() @@ -157,11 +194,9 @@ class Verifier(object): kwargs = {settle_units: settle_time} ending_max = now - datetime.timedelta(**kwargs) new = self.verify_for_range(ending_max, callback=callback) - values = ((self.exchange(), new,) + self.clean_results()) + self.check_results(new, force=True) if self.reconciler: self.reconcile_failed() - msg = "%s: N: %s, P: %s, S: %s, E: %s" % values - _get_child_logger().info(msg) time.sleep(tick_time) def run(self): diff --git a/verifier/config.py b/verifier/config.py index b9f4eee..695e1e3 100644 --- a/verifier/config.py +++ b/verifier/config.py @@ -105,6 +105,8 @@ def nova_event_type(): def glance_event_type(): return config.get('glance_event_type', 'image.exists.verified') +def batchsize(): + return config.get('batchsize', 1000) def flavor_field_name(): return config['flavor_field_name'] diff --git a/verifier/glance_verifier.py b/verifier/glance_verifier.py index 2b2a9d3..bc4e080 100644 --- a/verifier/glance_verifier.py +++ b/verifier/glance_verifier.py @@ -158,42 +158,35 @@ def _verify(exists): class GlanceVerifier(Verifier): - def __init__(self, config, pool=None, stats=None): - super(GlanceVerifier, self).__init__(config, pool=pool, stats=stats) def verify_exists(self, grouped_exists, callback, verifying_status): count = len(grouped_exists) added = 0 - update_interval = datetime.timedelta(seconds=30) - next_update = datetime.datetime.utcnow() + update_interval _get_child_logger().info("glance: Adding %s per-owner exists to queue." % count) - while added < count: - for exists in grouped_exists.values(): - for exist in exists: - exist.status = verifying_status - exist.save() - result = self.pool.apply_async(_verify, args=(exists,), - callback=callback) - self.results.append(result) - added += 1 - if datetime.datetime.utcnow() > next_update: - values = ((added,) + self.clean_results()) - msg = "glance: N: %s, P: %s, S: %s, E: %s" % values - _get_child_logger().info(msg) - next_update = datetime.datetime.utcnow() + update_interval + for exists in grouped_exists.values(): + for exist in exists: + exist.status = verifying_status + exist.save() + result = self.pool.apply_async(_verify, args=(exists,), + callback=callback) + self.results.append(result) + added += 1 + self.check_results(added) return count def verify_for_range(self, ending_max, callback=None): unsent_exists_grouped_by_owner_and_rawid = \ models.ImageExists.find_and_group_by_owner_and_raw_id( ending_max=ending_max, - status=models.ImageExists.SENT_UNVERIFIED) + status=models.ImageExists.SENT_UNVERIFIED, + batchsize=self.batchsize) unsent_count = self.verify_exists(unsent_exists_grouped_by_owner_and_rawid, None, models.ImageExists.SENT_VERIFYING) exists_grouped_by_owner_and_rawid = \ models.ImageExists.find_and_group_by_owner_and_raw_id( ending_max=ending_max, - status=models.ImageExists.PENDING) + status=models.ImageExists.PENDING, + batchsize=self.batchsize) count = self.verify_exists(exists_grouped_by_owner_and_rawid, callback, models.ImageExists.VERIFYING) diff --git a/verifier/nova_verifier.py b/verifier/nova_verifier.py index 6ea33f8..999c959 100644 --- a/verifier/nova_verifier.py +++ b/verifier/nova_verifier.py @@ -322,36 +322,31 @@ class NovaVerifier(base_verifier.Verifier): def verify_exists(self, callback, exists, verifying_status): count = exists.count() added = 0 - update_interval = datetime.timedelta(seconds=30) - next_update = datetime.datetime.utcnow() + update_interval _get_child_logger().info("nova: Adding %s exists to queue." % count) - while added < count: - for exist in exists[0:1000]: - exist.update_status(verifying_status) - exist.save() - validation_level = self.config.validation_level() - result = self.pool.apply_async( - _verify, args=(exist, validation_level), - callback=callback) - self.results.append(result) - added += 1 - if datetime.datetime.utcnow() > next_update: - values = ((added,) + self.clean_results()) - msg = "nova: N: %s, P: %s, S: %s, E: %s" % values - _get_child_logger().info(msg) - next_update = datetime.datetime.utcnow() + update_interval + for exist in exists: + exist.update_status(verifying_status) + exist.save() + validation_level = self.config.validation_level() + result = self.pool.apply_async( + _verify, args=(exist, validation_level), + callback=callback) + self.results.append(result) + added += 1 + self.check_results(added) return count def verify_for_range(self, ending_max, callback=None): sent_unverified_exists = models.InstanceExists.find( ending_max=ending_max, status= models.InstanceExists.SENT_UNVERIFIED) + sent_unverified_exists = sent_unverified_exists[:self.batchsize] sent_unverified_count = self.verify_exists(None, sent_unverified_exists, models.InstanceExists. SENT_VERIFYING) exists = models.InstanceExists.find( ending_max=ending_max, status=models.InstanceExists.PENDING) + exists = exists[:self.batchsize] count = self.verify_exists(callback, exists, models.InstanceExists.VERIFYING) return count+sent_unverified_count diff --git a/verifier/start_verifier.py b/verifier/start_verifier.py index 762fb6c..b4f9564 100644 --- a/verifier/start_verifier.py +++ b/verifier/start_verifier.py @@ -74,6 +74,13 @@ def make_and_start_verifier(exchange, stats=None): verifier = glance_verifier.GlanceVerifier(verifier_config, stats=stats) + def sig_handler(signal_number, frame): + verifier.handle_signal(signal_number) + + signal.signal(signal.SIGINT, sig_handler) + signal.signal(signal.SIGTERM, sig_handler) + signal.signal(signal.SIGUSR1, sig_handler) + verifier.run() @@ -148,10 +155,21 @@ def stop_all(): processes[pname]['pid'] = 0 -def kill_time(signal, frame): +def signal_all(signal_number): + procs = sorted(processes.keys()) + for pname in procs: + if is_alive(processes[pname]): + pid = processes[pname]['pid'] + os.kill(pid, signal_number) + + +def kill_time(signal_number, frame): global RUNNING - RUNNING = False - stop_all() + if signal_number in (signal.SIGTERM, signal.SIGKILL): + RUNNING = False + stop_all() + if signal_number == signal.SIGUSR1: + signal_all(signal.SIGUSR1) if __name__ == '__main__': @@ -166,6 +184,7 @@ if __name__ == '__main__': signal.signal(signal.SIGINT, kill_time) signal.signal(signal.SIGTERM, kill_time) + signal.signal(signal.SIGUSR1, kill_time) logger.info("Starting Verifiers...") while RUNNING: