From a2400d554254001c1dad6f98fa72c83d2072447c Mon Sep 17 00:00:00 2001 From: Andrew Melton Date: Tue, 5 Mar 2013 16:16:34 -0500 Subject: [PATCH] Nova exists event verifier --- run_tests.sh | 2 +- stacktach/models.py | 2 + tests/unit/test_verifier_db.py | 423 +++++++++++++++++++++++++++++++++ verifier/__init__.py | 33 +++ verifier/dbverifier.py | 290 ++++++++++++++++++++++ verifier/start_verifier.py | 46 ++++ verifier/verifier.sh | 26 ++ 7 files changed, 821 insertions(+), 1 deletion(-) create mode 100644 tests/unit/test_verifier_db.py create mode 100644 verifier/__init__.py create mode 100644 verifier/dbverifier.py create mode 100644 verifier/start_verifier.py create mode 100755 verifier/verifier.sh diff --git a/run_tests.sh b/run_tests.sh index d5e661f..26e6463 100755 --- a/run_tests.sh +++ b/run_tests.sh @@ -1,2 +1,2 @@ #!/bin/bash -nosetests tests --exclude-dir=stacktach --with-coverage --cover-package=stacktach,worker +nosetests tests --exclude-dir=stacktach --with-coverage --cover-package=stacktach,worker,verifier diff --git a/stacktach/models.py b/stacktach/models.py index 46f4597..935f019 100644 --- a/stacktach/models.py +++ b/stacktach/models.py @@ -100,10 +100,12 @@ class InstanceDeletes(models.Model): class InstanceExists(models.Model): PENDING = 'pending' + VERIFYING = 'verifying' VERIFIED = 'verified' FAILED = 'failed' STATUS_CHOICES = [ (PENDING, 'Pending Verification'), + (VERIFYING, 'Currently Being Verified'), (VERIFIED, 'Passed Verification'), (FAILED, 'Failed Verification'), ] diff --git a/tests/unit/test_verifier_db.py b/tests/unit/test_verifier_db.py new file mode 100644 index 0000000..fe2bd11 --- /dev/null +++ b/tests/unit/test_verifier_db.py @@ -0,0 +1,423 @@ +import datetime +import decimal +import json +import unittest + +import mox + +from stacktach import datetime_to_decimal as dt +from stacktach import models +import utils +from utils import INSTANCE_ID_1 +from utils import INSTANCE_ID_2 +from utils import REQUEST_ID_1 + +from verifier import dbverifier +from verifier import AmbiguousResults +from verifier import FieldMismatch +from verifier import NotFound +from verifier import VerificationException + + +class VerifierTestCase(unittest.TestCase): + def setUp(self): + self.mox = mox.Mox() + self.mox.StubOutWithMock(models, 'RawData', use_mock_anything=True) + models.RawData.objects = self.mox.CreateMockAnything() + self.mox.StubOutWithMock(models, 'Deployment', use_mock_anything=True) + models.Deployment.objects = self.mox.CreateMockAnything() + self.mox.StubOutWithMock(models, 'Lifecycle', use_mock_anything=True) + models.Lifecycle.objects = self.mox.CreateMockAnything() + self.mox.StubOutWithMock(models, 'Timing', use_mock_anything=True) + models.Timing.objects = self.mox.CreateMockAnything() + self.mox.StubOutWithMock(models, 'RequestTracker', + use_mock_anything=True) + models.RequestTracker.objects = self.mox.CreateMockAnything() + self.mox.StubOutWithMock(models, 'InstanceUsage', + use_mock_anything=True) + models.InstanceUsage.objects = self.mox.CreateMockAnything() + self.mox.StubOutWithMock(models, 'InstanceDeletes', + use_mock_anything=True) + models.InstanceDeletes.objects = self.mox.CreateMockAnything() + self.mox.StubOutWithMock(models, 'InstanceExists', + use_mock_anything=True) + models.InstanceExists.objects = self.mox.CreateMockAnything() + self.mox.StubOutWithMock(models, 'JsonReport', use_mock_anything=True) + models.JsonReport.objects = self.mox.CreateMockAnything() + + def tearDown(self): + self.mox.UnsetStubs() + + def test_verify_for_launch(self): + exist = self.mox.CreateMockAnything() + exist.usage = self.mox.CreateMockAnything() + exist.launched_at = decimal.Decimal('1.1') + exist.instance_type_id = 2 + exist.usage.launched_at = decimal.Decimal('1.1') + exist.usage.instance_type_id = 2 + self.mox.ReplayAll() + + dbverifier._verify_for_launch(exist) + self.mox.VerifyAll() + + def test_verify_for_launch_launched_at_in_range(self): + exist = self.mox.CreateMockAnything() + exist.usage = self.mox.CreateMockAnything() + exist.launched_at = decimal.Decimal('1.0') + exist.instance_type_id = 2 + exist.usage.launched_at = decimal.Decimal('1.4') + exist.usage.instance_type_id = 2 + self.mox.ReplayAll() + + result = dbverifier._verify_for_launch(exist) + self.assertIsNone(result) + + self.mox.VerifyAll() + + def test_verify_for_launch_launched_at_missmatch(self): + exist = self.mox.CreateMockAnything() + exist.usage = self.mox.CreateMockAnything() + exist.launched_at = decimal.Decimal('1.1') + exist.instance_type_id = 2 + exist.usage.launched_at = decimal.Decimal('2.1') + exist.usage.instance_type_id = 2 + self.mox.ReplayAll() + + try: + dbverifier._verify_for_launch(exist) + self.fail() + except FieldMismatch, fm: + self.assertEqual(fm.field_name, 'launched_at') + self.assertEqual(fm.expected, decimal.Decimal('1.1')) + self.assertEqual(fm.actual, decimal.Decimal('2.1')) + + self.mox.VerifyAll() + + def test_verify_for_launch_instance_type_id_missmatch(self): + exist = self.mox.CreateMockAnything() + exist.usage = self.mox.CreateMockAnything() + exist.launched_at = decimal.Decimal('1.1') + exist.instance_type_id = 2 + exist.usage.launched_at = decimal.Decimal('1.1') + exist.usage.instance_type_id = 3 + self.mox.ReplayAll() + + try: + dbverifier._verify_for_launch(exist) + self.fail() + except FieldMismatch, fm: + self.assertEqual(fm.field_name, 'instance_type_id') + self.assertEqual(fm.expected, 2) + self.assertEqual(fm.actual, 3) + + self.mox.VerifyAll() + + def test_verify_for_launch_late_usage(self): + exist = self.mox.CreateMockAnything() + exist.usage = None + exist.instance = INSTANCE_ID_1 + exist.launched_at = decimal.Decimal('1.1') + exist.instance_type_id = 2 + results = self.mox.CreateMockAnything() + models.InstanceUsage.objects.filter(instance=INSTANCE_ID_1)\ + .AndReturn(results) + results.count().AndReturn(1) + filters = { + 'instance': INSTANCE_ID_1, + 'launched_at__gte': decimal.Decimal('1.0'), + 'launched_at__lte': decimal.Decimal('1.999999') + } + models.InstanceUsage.objects.filter(**filters).AndReturn(results) + results.count().AndReturn(1) + usage = self.mox.CreateMockAnything() + results.__getitem__(0).AndReturn(usage) + usage.launched_at = decimal.Decimal('1.1') + usage.instance_type_id = 2 + self.mox.ReplayAll() + + dbverifier._verify_for_launch(exist) + self.mox.VerifyAll() + + def test_verify_for_launch_no_usage(self): + exist = self.mox.CreateMockAnything() + exist.usage = None + exist.instance = INSTANCE_ID_1 + exist.launched_at = decimal.Decimal('1.1') + exist.instance_type_id = 2 + results = self.mox.CreateMockAnything() + models.InstanceUsage.objects.filter(instance=INSTANCE_ID_1) \ + .AndReturn(results) + results.count().AndReturn(0) + self.mox.ReplayAll() + + try: + dbverifier._verify_for_launch(exist) + self.fail() + except NotFound, nf: + self.assertEqual(nf.object_type, 'InstanceUsage') + self.assertEqual(nf.search_params, {'instance': INSTANCE_ID_1}) + + self.mox.VerifyAll() + + def test_verify_for_launch_late_ambiguous_usage(self): + exist = self.mox.CreateMockAnything() + exist.usage = None + exist.instance = INSTANCE_ID_1 + exist.launched_at = decimal.Decimal('1.1') + exist.instance_type_id = 2 + results = self.mox.CreateMockAnything() + models.InstanceUsage.objects.filter(instance=INSTANCE_ID_1) \ + .AndReturn(results) + results.count().AndReturn(1) + filters = { + 'instance': INSTANCE_ID_1, + 'launched_at__gte': decimal.Decimal('1.0'), + 'launched_at__lte': decimal.Decimal('1.999999') + } + models.InstanceUsage.objects.filter(**filters).AndReturn(results) + results.count().AndReturn(2) + self.mox.ReplayAll() + + try: + dbverifier._verify_for_launch(exist) + self.fail() + except AmbiguousResults, nf: + self.assertEqual(nf.object_type, 'InstanceUsage') + search_params = {'instance': INSTANCE_ID_1, + 'launched_at': decimal.Decimal('1.1')} + self.assertEqual(nf.search_params, search_params) + + self.mox.VerifyAll() + + def test_verify_for_delete(self): + exist = self.mox.CreateMockAnything() + exist.delete = self.mox.CreateMockAnything() + exist.launched_at = decimal.Decimal('1.1') + exist.deleted_at = decimal.Decimal('5.1') + exist.delete.launched_at = decimal.Decimal('1.1') + exist.delete.deleted_at = decimal.Decimal('5.1') + self.mox.ReplayAll() + + dbverifier._verify_for_delete(exist) + self.mox.VerifyAll() + + def test_verify_for_delete_found_delete(self): + exist = self.mox.CreateMockAnything() + exist.delete = None + exist.instance = INSTANCE_ID_1 + exist.launched_at = decimal.Decimal('1.1') + exist.deleted_at = decimal.Decimal('5.1') + filters = { + 'instance': INSTANCE_ID_1, + 'launched_at__gte': decimal.Decimal('1.0'), + 'launched_at__lte': decimal.Decimal('1.999999'), + } + results = self.mox.CreateMockAnything() + models.InstanceDeletes.objects.filter(**filters).AndReturn(results) + results.count().AndReturn(1) + delete = self.mox.CreateMockAnything() + delete.launched_at = decimal.Decimal('1.1') + delete.deleted_at = decimal.Decimal('5.1') + results.__getitem__(0).AndReturn(delete) + + self.mox.ReplayAll() + + dbverifier._verify_for_delete(exist) + self.mox.VerifyAll() + + def test_verify_for_delete_non_delete(self): + exist = self.mox.CreateMockAnything() + exist.delete = None + exist.instance = INSTANCE_ID_1 + exist.launched_at = decimal.Decimal('1.1') + exist.deleted_at = None + exist.raw = self.mox.CreateMockAnything() + exist.raw.when = decimal.Decimal('1.1') + filters = { + 'instance': INSTANCE_ID_1, + 'launched_at__gte': decimal.Decimal('1.0'), + 'launched_at__lte': decimal.Decimal('1.999999'), + 'deleted_at__lte': decimal.Decimal('1.1') + } + results = self.mox.CreateMockAnything() + models.InstanceDeletes.objects.filter(**filters).AndReturn(results) + results.count().AndReturn(0) + + self.mox.ReplayAll() + + dbverifier._verify_for_delete(exist) + self.mox.VerifyAll() + + def test_verify_for_delete_non_delete_found_deletes(self): + exist = self.mox.CreateMockAnything() + exist.delete = None + exist.instance = INSTANCE_ID_1 + exist.launched_at = decimal.Decimal('1.1') + exist.deleted_at = None + exist.raw = self.mox.CreateMockAnything() + exist.raw.when = decimal.Decimal('1.1') + filters = { + 'instance': INSTANCE_ID_1, + 'launched_at__gte': decimal.Decimal('1.0'), + 'launched_at__lte': decimal.Decimal('1.999999'), + 'deleted_at__lte': decimal.Decimal('1.1') + } + results = self.mox.CreateMockAnything() + models.InstanceDeletes.objects.filter(**filters).AndReturn(results) + results.count().AndReturn(1) + + self.mox.ReplayAll() + + try: + dbverifier._verify_for_delete(exist) + self.fail() + except VerificationException, ve: + msg = 'Found InstanceDeletes for non-delete exist' + self.assertEqual(ve.reason, msg) + + self.mox.VerifyAll() + + def test_verify_for_delete_launched_at_mismatch(self): + exist = self.mox.CreateMockAnything() + exist.delete = self.mox.CreateMockAnything() + exist.launched_at = decimal.Decimal('1.1') + exist.deleted_at = decimal.Decimal('5.1') + exist.delete.launched_at = decimal.Decimal('2.1') + exist.delete.deleted_at = decimal.Decimal('5.1') + self.mox.ReplayAll() + + try: + dbverifier._verify_for_delete(exist) + self.fail() + except FieldMismatch, fm: + self.assertEqual(fm.field_name, 'launched_at') + self.assertEqual(fm.expected, decimal.Decimal('1.1')) + self.assertEqual(fm.actual, decimal.Decimal('2.1')) + self.mox.VerifyAll() + + def test_verify_for_delete_deleted_at_mismatch(self): + exist = self.mox.CreateMockAnything() + exist.delete = self.mox.CreateMockAnything() + exist.launched_at = decimal.Decimal('1.1') + exist.deleted_at = decimal.Decimal('5.1') + exist.delete.launched_at = decimal.Decimal('1.1') + exist.delete.deleted_at = decimal.Decimal('6.1') + self.mox.ReplayAll() + + try: + dbverifier._verify_for_delete(exist) + self.fail() + except FieldMismatch, fm: + self.assertEqual(fm.field_name, 'deleted_at') + self.assertEqual(fm.expected, decimal.Decimal('5.1')) + self.assertEqual(fm.actual, decimal.Decimal('6.1')) + self.mox.VerifyAll() + + def test_verify(self): + exist = self.mox.CreateMockAnything() + exist.launched_at = decimal.Decimal('1.1') + self.mox.StubOutWithMock(dbverifier, '_verify_for_launch') + self.mox.StubOutWithMock(dbverifier, '_verify_for_delete') + self.mox.StubOutWithMock(dbverifier, '_mark_exists_failed') + self.mox.StubOutWithMock(dbverifier, '_mark_exist_verified') + dbverifier._verify_for_launch(exist) + dbverifier._verify_for_delete(exist) + dbverifier._mark_exist_verified(exist) + self.mox.ReplayAll() + dbverifier._verify(exist) + self.mox.VerifyAll() + + def test_verify(self): + exist = self.mox.CreateMockAnything() + self.mox.StubOutWithMock(dbverifier, '_verify_for_launch') + self.mox.StubOutWithMock(dbverifier, '_verify_for_delete') + self.mox.StubOutWithMock(dbverifier, '_mark_exists_failed') + self.mox.StubOutWithMock(dbverifier, '_mark_exist_verified') + dbverifier._mark_exists_failed(exist) + self.mox.ReplayAll() + dbverifier._verify(exist) + self.mox.VerifyAll() + + def test_verify_launch_fail(self): + exist = self.mox.CreateMockAnything() + exist.launched_at = decimal.Decimal('1.1') + self.mox.StubOutWithMock(dbverifier, '_verify_for_launch') + self.mox.StubOutWithMock(dbverifier, '_verify_for_delete') + self.mox.StubOutWithMock(dbverifier, '_mark_exists_failed') + self.mox.StubOutWithMock(dbverifier, '_mark_exist_verified') + verify_exception = VerificationException('test') + dbverifier._verify_for_launch(exist).AndRaise(verify_exception) + dbverifier._mark_exists_failed(exist) + self.mox.ReplayAll() + dbverifier._verify(exist) + self.mox.VerifyAll() + + def test_verify_delete_fail(self): + exist = self.mox.CreateMockAnything() + exist.launched_at = decimal.Decimal('1.1') + self.mox.StubOutWithMock(dbverifier, '_verify_for_launch') + self.mox.StubOutWithMock(dbverifier, '_verify_for_delete') + self.mox.StubOutWithMock(dbverifier, '_mark_exists_failed') + self.mox.StubOutWithMock(dbverifier, '_mark_exist_verified') + verify_exception = VerificationException('test') + dbverifier._verify_for_launch(exist) + dbverifier._verify_for_delete(exist).AndRaise(verify_exception) + dbverifier._mark_exists_failed(exist) + self.mox.ReplayAll() + dbverifier._verify(exist) + self.mox.VerifyAll() + + def test_verify_exception_during_launch(self): + exist = self.mox.CreateMockAnything() + exist.launched_at = decimal.Decimal('1.1') + self.mox.StubOutWithMock(dbverifier, '_verify_for_launch') + self.mox.StubOutWithMock(dbverifier, '_verify_for_delete') + self.mox.StubOutWithMock(dbverifier, '_mark_exists_failed') + self.mox.StubOutWithMock(dbverifier, '_mark_exist_verified') + + dbverifier._verify_for_launch(exist).AndRaise(Exception()) + dbverifier._mark_exists_failed(exist) + self.mox.ReplayAll() + dbverifier._verify(exist) + self.mox.VerifyAll() + + def test_verify_exception_during_delete(self): + exist = self.mox.CreateMockAnything() + exist.launched_at = decimal.Decimal('1.1') + self.mox.StubOutWithMock(dbverifier, '_verify_for_launch') + self.mox.StubOutWithMock(dbverifier, '_verify_for_delete') + self.mox.StubOutWithMock(dbverifier, '_mark_exists_failed') + self.mox.StubOutWithMock(dbverifier, '_mark_exist_verified') + dbverifier._verify_for_launch(exist) + dbverifier._verify_for_delete(exist).AndRaise(Exception()) + dbverifier._mark_exists_failed(exist) + self.mox.ReplayAll() + dbverifier._verify(exist) + self.mox.VerifyAll() + + def test_verify_for_range(self): + pool = self.mox.CreateMockAnything() + when_max = datetime.datetime.utcnow() + results = self.mox.CreateMockAnything() + models.InstanceExists.objects.select_related().AndReturn(results) + models.InstanceExists.PENDING = 'pending' + models.InstanceExists.VERIFYING = 'verifying' + filters = { + 'raw__when__lte': dt.dt_to_decimal(when_max), + 'status': 'pending' + } + results.filter(**filters).AndReturn(results) + results.order_by('id').AndReturn(results) + results.count().AndReturn(2) + exist1 = self.mox.CreateMockAnything() + exist2 = self.mox.CreateMockAnything() + results.__iter__().AndReturn([exist1, exist2].__iter__()) + exist1.save() + exist2.save() + pool.apply_async(dbverifier._verify, args=(exist1,)) + pool.apply_async(dbverifier._verify, args=(exist2,)) + self.mox.ReplayAll() + dbverifier.verify_for_range(pool, when_max) + self.assertEqual(exist1.status, 'verifying') + self.assertEqual(exist2.status, 'verifying') + self.mox.VerifyAll() diff --git a/verifier/__init__.py b/verifier/__init__.py new file mode 100644 index 0000000..f8abb88 --- /dev/null +++ b/verifier/__init__.py @@ -0,0 +1,33 @@ +class VerificationException(Exception): + def __init__(self, reason): + self.reason = reason + + def __str__(self): + return self.reason + + +class NotFound(VerificationException): + def __init__(self, object_type, search_params): + self.object_type = object_type + self.search_params = search_params + self.reason = "Couldn't find %s using %s" % (self.object_type, + self.search_params) + + +class AmbiguousResults(VerificationException): + def __init__(self, object_type, search_params): + self.object_type = object_type + self.search_params = search_params + msg = "Ambiguous results for %s using %s" % (self.object_type, + self.search_params) + self.reason = msg + + +class FieldMismatch(VerificationException): + def __init__(self, field_name, expected, actual): + self.field_name = field_name + self.expected = expected + self.actual = actual + self.reason = "Expected %s to be '%s' got '%s'" % (self.field_name, + self.expected, + self.actual) diff --git a/verifier/dbverifier.py b/verifier/dbverifier.py new file mode 100644 index 0000000..b562f90 --- /dev/null +++ b/verifier/dbverifier.py @@ -0,0 +1,290 @@ +import argparse +import datetime +import logging +import os +import sys +from time import sleep + +import multiprocessing + +POSSIBLE_TOPDIR = os.path.normpath(os.path.join(os.path.abspath(sys.argv[0]), + os.pardir, os.pardir)) +if os.path.exists(os.path.join(POSSIBLE_TOPDIR, 'stacktach')): + sys.path.insert(0, POSSIBLE_TOPDIR) + +from stacktach import models +from stacktach import datetime_to_decimal as dt +from verifier import AmbiguousResults +from verifier import FieldMismatch +from verifier import NotFound +from verifier import VerificationException + +LOG = logging.getLogger(__name__) +LOG.setLevel(logging.DEBUG) +handler = logging.handlers.TimedRotatingFileHandler('verifier.log', + when='h', interval=6, backupCount=4) +formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') +handler.setFormatter(formatter) +LOG.addHandler(handler) + + +def _list_exists(received_max=None, received_min=None, status=None): + params = {} + if received_max: + params['raw__when__lte'] = dt.dt_to_decimal(received_max) + if received_min: + params['raw__when__gt'] = dt.dt_to_decimal(received_min) + if status: + params['status'] = status + return models.InstanceExists.objects.select_related()\ + .filter(**params).order_by('id') + + +def _find_launch(instance, launched): + start = launched - datetime.timedelta(microseconds=launched.microsecond) + end = start + datetime.timedelta(microseconds=999999) + params = {'instance': instance, + 'launched_at__gte': dt.dt_to_decimal(start), + 'launched_at__lte': dt.dt_to_decimal(end)} + return models.InstanceUsage.objects.filter(**params) + + +def _find_delete(instance, launched, deleted_max=None): + start = launched - datetime.timedelta(microseconds=launched.microsecond) + end = start + datetime.timedelta(microseconds=999999) + params = {'instance': instance, + 'launched_at__gte': dt.dt_to_decimal(start), + 'launched_at__lte': dt.dt_to_decimal(end)} + if deleted_max: + params['deleted_at__lte'] = dt.dt_to_decimal(deleted_max) + return models.InstanceDeletes.objects.filter(**params) + + +def _mark_exist_verified(exist): + exist.status = models.InstanceExists.VERIFIED + exist.save() + + +def _mark_exists_failed(exist): + exist.status = models.InstanceExists.FAILED + exist.save() + + +def _has_field(d1, d2, field1, field2=None): + if not field2: + field2 = field1 + + return d1.get(field1) is not None and d2.get(field2) is not None + + +def _verify_simple_field(d1, d2, field1, field2=None): + if not field2: + field2 = field1 + + if not _has_field(d1, d2, field1, field2): + return False + else: + if d1[field1] != d2[field2]: + return False + + return True + + +def _verify_date_field(d1, d2, same_second=False): + if d1 and d2: + if d1 == d2: + return True + elif same_second and int(d1) == int(d2): + return True + return False + + +def _verify_for_launch(exist): + if exist.usage: + launch = exist.usage + else: + if models.InstanceUsage.objects\ + .filter(instance=exist.instance).count() > 0: + launches = _find_launch(exist.instance, + dt.dt_from_decimal(exist.launched_at)) + if launches.count() != 1: + query = { + 'instance': exist.instance, + 'launched_at': exist.launched_at + } + raise AmbiguousResults('InstanceUsage', query) + launch = launches[0] + else: + raise NotFound('InstanceUsage', {'instance': exist.instance}) + + if not _verify_date_field(launch.launched_at, exist.launched_at, + same_second=True): + raise FieldMismatch('launched_at', exist.launched_at, + launch.launched_at) + + if launch.instance_type_id != exist.instance_type_id: + raise FieldMismatch('instance_type_id', exist.instance_type_id, + launch.instance_type_id) + + +def _verify_for_delete(exist): + + delete = None + if exist.delete: + # We know we have a delete and we have it's id + delete = exist.delete + else: + if exist.deleted_at: + # We received this exists before the delete, go find it + deletes = _find_delete(exist.instance, + dt.dt_from_decimal(exist.launched_at)) + if deletes.count() == 1: + delete = deletes[0] + else: + query = { + 'instance': exist.instance, + 'launched_at': exist.launched_at + } + raise NotFound('InstanceDelete', query) + else: + # We don't know if this is supposed to have a delete or not. + # Thus, we need to check if we have a delete for this instance. + # We need to be careful though, since we could be verifying an + # exist event that we got before the delete. So, we restrict the + # search to only deletes before the time this exist was sent. + # If we find any, we fail validation + deletes = _find_delete(exist.instance, + dt.dt_from_decimal(exist.launched_at), + dt.dt_from_decimal(exist.raw.when)) + if deletes.count() > 0: + reason = 'Found InstanceDeletes for non-delete exist' + raise VerificationException(reason) + + if delete: + if not _verify_date_field(delete.launched_at, exist.launched_at, + same_second=True): + raise FieldMismatch('launched_at', exist.launched_at, + delete.launched_at) + + if not _verify_date_field(delete.deleted_at, exist.deleted_at, + same_second=True): + raise FieldMismatch('deleted_at', exist.deleted_at, + delete.deleted_at) + + +def _verify(exist): + try: + if not exist.launched_at: + raise VerificationException("Exists without a launched_at") + + _verify_for_launch(exist) + _verify_for_delete(exist) + + _mark_exist_verified(exist) + except VerificationException: + _mark_exists_failed(exist) + except Exception, e: + _mark_exists_failed(exist) + LOG.exception(e) + + +results = [] + + +def verify_for_range(pool, when_max): + exists = _list_exists(received_max=when_max, + status=models.InstanceExists.PENDING) + count = exists.count() + for exist in exists: + exist.status = models.InstanceExists.VERIFYING + exist.save() + result = pool.apply_async(_verify, args=(exist,)) + results.append(result) + + return count + + +def clean_results(): + global results + + pending = [] + finished = 0 + successful = 0 + + for result in results: + if result.ready(): + finished += 1 + if result.successful(): + successful += 1 + else: + pending.append(result) + + results = pending + errored = finished - successful + return len(results), successful, errored + + +def run(config): + pool = multiprocessing.Pool(config['pool_size']) + + tick_time = config['tick_time'] + settle_units = config['settle_units'] + settle_time = config['settle_time'] + while True: + now = datetime.datetime.utcnow() + kwargs = {settle_units: settle_time} + when_max = now - datetime.timedelta(**kwargs) + new = verify_for_range(pool, when_max) + + LOG.info("N: %s, %s" % (new, "P: %s, S: %s, E: %s" % clean_results())) + sleep(tick_time) + + +def run_once(config): + pool = multiprocessing.Pool(config['pool_size']) + + tick_time = config['tick_time'] + settle_units = config['settle_units'] + settle_time = config['settle_time'] + now = datetime.datetime.utcnow() + kwargs = {settle_units: settle_time} + when_max = now - datetime.timedelta(**kwargs) + new = verify_for_range(pool, when_max) + + LOG.info("Verifying %s exist events" % new) + while len(results) > 0: + LOG.info("P: %s, F: %s, E: %s" % clean_results()) + sleep(tick_time) + + +if __name__ == '__main__': + parser = argparse.ArgumentParser(description= + "Stacktach Instance Exists Verifier") + parser.add_argument('--tick-time', + help='Time in seconds the verifier will sleep before' + 'it will check for new exists records.', + default=30) + parser.add_argument('--run-once', + help='Check database once and verify all returned' + 'exists records, then stop', + type=bool, + default=False) + parser.add_argument('--settle-time', + help='Time the verifier will wait for records to' + 'settle before it will verify them.', + default=10) + parser.add_argument('--settle-units', + help='Units for settle time', + default='minutes') + parser.add_argument('--pool-size', + help='Number of processes created to verify records', + type=int, + default=10) + args = parser.parse_args() + config = {'tick_time': args.tick_time, 'settle_time': args.settle_time, + 'settle_units': args.settle_units, 'pool_size': args.pool_size} + + if args.run_once: + run_once(config) + else: + run(config) diff --git a/verifier/start_verifier.py b/verifier/start_verifier.py new file mode 100644 index 0000000..eca22fe --- /dev/null +++ b/verifier/start_verifier.py @@ -0,0 +1,46 @@ +import json +import os +import signal +import sys + +from multiprocessing import Process + +POSSIBLE_TOPDIR = os.path.normpath(os.path.join(os.path.abspath(sys.argv[0]), + os.pardir, os.pardir)) +if os.path.exists(os.path.join(POSSIBLE_TOPDIR, 'stacktach')): + sys.path.insert(0, POSSIBLE_TOPDIR) + +from verifier import dbverifier + +config_filename = os.environ.get('STACKTACH_VERIFIER_CONFIG', + 'stacktach_verifier_config.json') +try: + from local_settings import * + config_filename = STACKTACH_VERIFIER_CONFIG +except ImportError: + pass + +process = None + + +def kill_time(signal, frame): + print "dying ..." + if process: + process.terminate() + print "rose" + if process: + process.join() + print "bud" + sys.exit(0) + + +if __name__ == '__main__': + config = None + with open(config_filename, "r") as f: + config = json.load(f) + + process = Process(target=dbverifier.run, args=(config, )) + process.start() + signal.signal(signal.SIGINT, kill_time) + signal.signal(signal.SIGTERM, kill_time) + signal.pause() diff --git a/verifier/verifier.sh b/verifier/verifier.sh new file mode 100755 index 0000000..e199818 --- /dev/null +++ b/verifier/verifier.sh @@ -0,0 +1,26 @@ +#!/bin/bash + +WORKDIR=/srv/www/stacktach/app +DAEMON=/usr/bin/python +ARGS=$WORKDIR/verifier/start_verifier.py +PIDFILE=/var/run/stacktach_verifier.pid + +export DJANGO_SETTINGS_MODULE="settings" + +case "$1" in + start) + echo "Starting server" + cd $WORKDIR + /sbin/start-stop-daemon --start --pidfile $PIDFILE --make-pidfile -b --exec $DAEMON $ARGS + ;; + stop) + echo "Stopping server" + /sbin/start-stop-daemon --stop --pidfile $PIDFILE --verbose + ;; + *) + echo "Usage: verifier.sh {start|stop}" + exit 1 + ;; +esac + +exit 0